七叶笔记 » golang编程 » linux开发技术之线程池accept处理高并发connect(含源码)

linux开发技术之线程池accept处理高并发connect(含源码)

前言

服务器在调用listen和accept后,就会阻塞在accept函数上,accpet函数返回后循环调用accept函数等待客户的TCP连接。

我们知道服务器段listen套接字能处理的连接数与监听队列的大小有关,如果这时候又大量的用户并发发起connec连接,那么在listen有队列上限(最大可接受TCP的连接数)的情况下,有多少个connect会成功了。

试验证明,当连接数远远高于listen的可连接数上限时,客户端的大部分TCP请求会被抛弃,只有当listen监听队列空闲或者放弃某个连接时,才可以接收新的连接

那么我们当并发的连接数较大时,服务器应该如何来避免这种情况出现?

客户端模拟高并发

客户端如何模拟高并发呢?

客户端运行初期完成所设定的一定量的 socket 创建和相应的处理 线程 的创建,然后使用条件变量来完成线程同步,直到最后一个线程创建完成,才向所有线程发出广播通知,让所有线程并发调用connect(这样相当于所有的客户端在同一时间请求连接,即高并发),连接成功则关闭连接,失败则返回。

linux系统中进程所能创建的线程数目?

关于 linux系统中进程所能创建的线程数目?

每个进程需要自己独立的栈空间,linux缺省的线程栈大小是10MB。在32位的机子上一个进程需要4G的内存空间,去掉自己的栈空间全局程序段空间,一般只有3G内存可以用,创建线程时就需要从这3G的空间中分配10M出来,所以最多可以分配300多个线程。

当然这里还可以使用多个进程,每个进程300个线程的方式来进一步扩大并发量。

当然64位系统中,此数值会不同,甚至会大很多

Linux最大线程数限制及当前线程数查询:

总结系统限制有:

cat /proc/sys/kernel/pid_max #查系统支持的最大线程数,一般会很大,相当于理论值

cat /proc/sys/kernel/threads-max

max_user_process(ulimit -u) #系统限制某用户下最多可以运行多少进程或线程

/proc/sys/vm/max_map_count 硬件内存大小

用线程模拟多个客户端

我们在客户端程序中,为我们模拟每个客户端创建一个线程

     pthread_t client[MAX_CLIENT_COUNT];
    for(int threadPos = 0;
        threadPos < MAX_CLIENT_COUNT;
        threadPos++)
    {
        if(pthread_create(&client[threadPos], NULL, ClientThreadFunc, serverIp) != 0)
        {
            perror("create client thread error...\n");
        }
    }  

然后在每个客户端线程中,创建并初始化客户端的套接字描述符。

  void * ClientThreadFunc(void *args)
{
    char *serverIp = (char *)args;
    /**********************************************************
     *
     *  创建并初始化套接字
     *
     **********************************************************/    struct sockaddr_in  serverAddr;          /*  服务器的套接字信息   */    int                 socketFd;                      /*  客户端的套接字信息   */    bzero(&serverAddr,  sizeof (serverAddr));             /*  全部置零               */    serverAddr.sin_family       =   AF_INET;                      /*  internet协议族         */    serverAddr.sin_addr.s_addr  =   inet_addr(serverIp);        /*  设置所连接服务器的IP   */    serverAddr.sin_port         =   htons(TCP_SERVER_PORT);       /*  设置连接的服务器端口   */    /*  开始创建套接字                        */    /*  SOCK_STREAM 面向连接的套接字,即TCP   */    socketFd                    =   socket(AF_INET, SOCK_STREAM, 0);
    if(socketFd < 0)
    {
        printf("socket error\n");
        exit(-1);
    }
    printf("Wait for connect...\n");
    pthread_cond_wait(&cond, &mut);
    pthread_mutex_unlock(&mut);
    //  信号会丢失,使得这里永远醒不了,所以需要重发信号.
    //  直到在客户端的主函数中调用pthread_cond_broadcast(&cond);
    /*  尝试连接服务器  */    if(connect(socketFd, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) < 0)
    {
        printf("Can Not Connect To %s\n", serverIp);
        exit(1);
    }
    else
    {
        printf("connect to the server %s SUCCESS...\n", serverIp);
        printf("连接服务器成功...\n");
    }
    /**********************************************************
     *
     *  下面进行正常的套接字通信
     *
     **********************************************************/    RaiseServerResponse(socketFd);
    //  关闭套接字的文件描述符
    close(socketFd);
    return EXIT_SUCCESS;
}  

用条件变量保证线程同步

条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待”条件变量的条件成立”而挂起;另一个线程使”条件成立”(给出条件成立信号)。为了防止竞争,条件变量的使用总是和一个互斥锁结合在一起。

当然这里有个关键问题,就是如何保证模拟出来所有的客户端同步在连接服务器之前

在这里就使用条件变量进行同步。

条件变量初始化

在connect之前设置条件变量,

注意:阻塞返回后立即解锁,防止互斥量加锁带来的阻塞

pthread_mutex_unlock(&mut);

当条件满足时,唤醒阻塞在条件变量上的线程:

 while(1)
{
    sleep(2);   //  主线程休眠两秒,等待所有的套接字创建好
    pthread_cond_broadcast(&cond);  //在所有线程创建完成后才进行唤醒。
}  

综上,客户端模拟并发过程中没有存在不同步的情况导致上述性能问题。(注意,在广播的时候,会出现广播丢失的情况,所以需要多次执行广播操作才会使得所有线程执行任务,所以某种程度上这里并不能模拟完完全全的并发)

[注意]

客户端和服务器之间的连接是在同一台机器上,使用Socket方式通信的话会经过127.0.0.1的回环线路,不会有网卡等硬件资源的访问性能消耗,所以不存在网络通信时延等问题。

服务器处理大并发

性能问题主要发生在服务器,可能是以下几部分造成:

服务器监听队列限制

服务器的监听队列listen(listenfd,xxx)

参数2指定队列内所能容纳的元素的最大值,

当来不及从队列中移除元素时(调用accept移除或者TCP自动放弃)就会造成队列满而使得一些请求丢失。

解决办法

增大队列容量是一种办法,但是注意等待队列太会带来效率的性能缺陷,而且listen函数对最大队列容量有一个上限,大小为SOMAXCONN,当然必要时刻我们可以修改这个常量的大小。

直接修改listen及相关函数的实现(比较麻烦,不建议),可以将listen所维护的队列修改为linklist,支持队列的动态增长。

accept阻塞且处理速度太慢

accept导致阻塞过长时间,使得队列无法及时清空已经完成3次连接的socket

也就是任意两个accept之间的时间间隔是关键因素,如下面代码所示(一般来说,可以通过listen之后在循环调用accept来将已完成3次握手的连接从listen所维护的队列中移除。

 listen(listenfd,10);
for(;;)
{
        if((connFd = accept(socketFd, (struct sockaddr*)&clientAddr, &length)) < 0)
        {
            printf("accept error, errno = %d...\n", errno);
            continue;
        }
//其他操作...
}  

解决办法:

* 两个accept之间尽量不要又多余的操作,使得accept返回后可以立刻执行下一个accept。经过试验,该方法可以较好的提高性能,减少connect的丢失数。

本质上这是一种“生产者-消费者”的模式,listen维护“已连接”和“待连接”的队列,当客户发出连接请求并最终连接成功时,在“已连接”队列中会生产一个“product”,然后这时候希望“消费者”也就是accept函数可以快速的从队列中消费这个“product”,这样就不会因为队列满而导致无法继续生产(也就是客户的connect会失效,导致上面队列长度10,300个并发connect带来的67个存活的情况),但是在本例情况下,我们无法控制生产者的疯狂生产行为,因为连接是客户发起的,这是不可预知的,所以我们如果想不修改listen函数来提高性能的话,那么就只能让消费者更加快的把产品消耗掉,使得listen队列可以容纳更多的新生产的产品,而第一种加快消费者消耗产品的方法就是a,第二种加快消费者消耗产品的方法是我们可以增加多几个消费者来帮忙消耗,但是这几个消费者间也要好好协调。第三种方法是让消费者把产品先移走为listen的队列腾出空间,再自行处理产品,如d所示。

使用多线程策略,每个线程独立调用accept。(花了一个上午的时候正glib的 线程池 ,一直用不了,其他都正常,就是线程不启动,不知道会不会是bug)

修改listen和accept的实现方式,让listen所维护的队列可以智能的判断拥挤情况,从而对accept的调用做出调度,在队列繁忙时,使用多线程的方式让多个accept来移除队列中的元素,在队列空闲时,可以适当的调整accept的处理线程数,这也是一种线程池的实现。

修改accept的实现方式,在accept中实现一个“消费缓冲区”,为的是及时将listen中的队列元素移动到该缓冲区中,再由其他处理线程或者进程来对缓冲区中的元素进行处理,这个方法尽量让listen队列中已连接的socket可以被移除。这个方法比较上述方法来说是比较好的一种,但是还是需要修改已有的代码。

线程池

服务器短开辟一个线程池,线程完成与原来单线程的服务器执行相同的操作,accept接受客户端的请求,并且开辟线程进行响应进行

服务器线程池

     pthread_t client[MAX_CLIENT_COUNT];
    for(int threadPos = 0;
        threadPos < MAX_CLIENT_COUNT;
        threadPos++)
    {
        if(pthread_create(&client[threadPos], NULL, ClientThreadFunc, serverIp) != 0)
        {
            perror("create client thread error...\n");
        }
    }  

线程池中的线程完成同原来服务器主线程完全相同的操作

 void* AcceptThreadFunc(void *args)
{
    int *psocketFd = (int *)args;
    int socketFd = *psocketFd;
    while( 1 )
    {
        int                 connFd;
        struct sockaddr_in  clientAddr;
        socklen_t           length = sizeof(clientAddr);
        /**********************************************************
         *
         *  ACCEPT返回一个新的套接字与客户端进行通信
         *
         **********************************************************/        if((connFd = accept(socketFd, (struct sockaddr*)&clientAddr, &length)) < 0)
        {
            printf("accept error, errno = %d...\n", errno);
            continue;
        }
        else
        {
            printf("Pthread id = %ld, connfd = %d\n",
                    (unsigned long)pthread_self(), connFd);
            ////////////////////////////////////////////////////////////////////////
            //
            //  这里填写服务器的处理代码
            //
            ////////////////////////////////////////////////////////////////////////
            printf("\n\naccept connect from client %s\n",inet_ntoa(clientAddr.sin_addr));
            printf("获取到从客户端%s的连接...\n\n\n", inet_ntoa(clientAddr.sin_addr));
            pthread_t   childThread;
            arg_type    raise_args = {connFd, clientAddr};
            /*
             *  第一个参数为指向线程标识符的指针。
             *  第二个参数用来设置线程属性。
             *  第三个参数是线程运行函数的起始地址。
             *  最后一个参数是运行函数的参数。
            */            if(pthread_create(&childThread, NULL, RaiseThreadFunc, &raise_args) != 0)
            {
                perror("pthread_create error...\n");
                break;
            }
            else
            {
                printf("create raise pthread success...\n");
            }
        }
    }
    return NULL;
}  

代码

服务器

 # include  <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <errno.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <signal.h>
#include <pthread.h>
#define TCP_SERVER_PORT             6666    /*  服务器的端口  */#define BUFFER_SIZE                 4096
#define IP_SIZE                     20
#define MAX_FILENAME_SIZE           256
#define LISTEN_QUEUE                10
#define MAX_ACCEPT_THREAD_COUNT     20
typedef struct arg_type
{
    int                 connFd;
    struct sockaddr_in  clientAddr;
} arg_type;
extern int errno;
/* 服务器接收从客户端传送来的文件  */void
TcpServerPullFile(
            int         connFd,                     /*  服务器与客户端通讯的套接字文件  */            struct      sockaddr_in clientAddr,     /*  与之通信的客户端的信息  */            char        *fileServerRoot);           /*  上传文件的存储路径  *//*  服务器将文件发送到客户端  */void TcpServerPushFile(
                    int         connFd,                  /*  服务器与客户端通讯的套接字文件  */                    struct      sockaddr_in clientAddr,  /*  与之通信的客户端的信息  */                    char        *filePath);              /*  带发送至客户端的文件路径  *//*  处理子进程退出的信号处理函数  */void SignalChild(int signo);                        /* 信号的标示信息         *//// pthread线程的处理客户端的请求信息
void RaiseClientRequest(
          int connFd,     /*  客户端的连接套接字描述符, 用于发送和接收数据  */          struct sockaddr_in  clientAddr); /*  客户端的信息, 用于显示一些客户端的信息  */void* RaiseThreadFunc(void *args);
//  accept pthread的多线程处理客户端连接的函数
void* AcceptThreadFunc(void *args);
        //  nt scketFd  服务器的监听套接字描述符
int main(int argc, char *argv[])
{
    /**********************************************************
     *
     *  创建并初始化服务器套接字
     *
     **********************************************************/    struct sockaddr_in      serverAddr;
    int                     socketFd;
    bzero(&serverAddr, sizeof(serverAddr));     /*  全部置零  */    /* 设置地址相关的属性 */    serverAddr.sin_family         =   AF_INET;
    serverAddr.sin_addr.s_addr    =   htons(INADDR_ANY);
    serverAddr.sin_port           =   htons(TCP_SERVER_PORT);
    /*  创建套接字  */    socketFd = socket(AF_INET, SOCK_STREAM, 0);
    if(socketFd < 0)
    {
        perror("socket create error\n");
        exit(-1);
    }
    else
    {
        printf("socket create success...\n");
        printf("创建套接字成功[errno = %d]...\n", errno);
    }
    /*  绑定端口  */    /**********************************************************
     *
     *  命名服务器的套接字, 进行BIND端口绑定
     *
     **********************************************************/    if(bind(socketFd, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) > 0)
    {
        perror("bind error\n");
        exit(-1);
    }
    else
    {
        printf("server bind port %d success...\n", TCP_SERVER_PORT);
        printf("服务器绑定端口%d成功...\n", TCP_SERVER_PORT);
    }
    /*  开始监听绑定的端口  */    /**********************************************************
     *
     *  开始监听服务器绑定的端口
     *
     **********************************************************/    if(listen(socketFd, LISTEN_QUEUE))
    {
        printf("Server listen error[errno = %d]...\n", errno);
        exit(-1);
    }
    else
    {
        printf("Server listen success...\n");
        printf("服务器开始监听...\n");
    }
    //  SIGCHLD 进程Terminate或Stop的时候,SIGCHLD会发送给它的父进程。
    //  缺省情况下该Signal会被忽略
    signal(SIGCHLD, SignalChild);                //子进程退出的信号处理
    //  创建处理连接的线程池
    pthread_t   acceptThread[20];
    for(int threadPos = 0;
        threadPos < 20;
        threadPos++)
    {
        if(pthread_create(&acceptThread[threadPos], NULL, AcceptThreadFunc, &socketFd) != 0)    //创建线程来帮忙accept
        {
            perror("create accept thread error\n");
            exit(-1);
        }
        else
        {
            printf("Create pthread %d...\n", threadPos);
        }
    }
    while( 1 )
    {
        int connFd;
        struct sockaddr_in  clientAddr;
        socklen_t           length = sizeof(clientAddr);
        /* accept返回一个新的套接字与客户端进行通信  */        /**********************************************************
         *
         *  ACCEPT返回一个新的套接字与客户端进行通信
         *
         **********************************************************/        if((connFd = accept(socketFd, (struct sockaddr*)&clientAddr, &length)) < 0)
        {
            printf("accept error, errno = %d...\n", errno);
            continue;
        }
        else
        {
            ////////////////////////////////////////////////////////////////////////
            //
            //  这里填写服务器的处理代码
            //
            ////////////////////////////////////////////////////////////////////////
            printf("\n\naccept connect from client %s\n",inet_ntoa(clientAddr.sin_addr));
            printf("获取到从客户端%s的连接...\n\n\n", inet_ntoa(clientAddr.sin_addr));
            pthread_t   childThread;
            arg_type    raise_args = { connFd, clientAddr };
            /*
             *  第一个参数为指向线程标识符的指针。
             *  第二个参数用来设置线程属性。
             *  第三个参数是线程运行函数的起始地址。
             *  最后一个参数是运行函数的参数。
            */            if(pthread_create(&childThread, NULL, RaiseThreadFunc, &raise_args) != 0)
            {
                perror("pthread_create error...\n");
                break;
            }
        }
    }
    close(socketFd);
}
void* AcceptThreadFunc(void *args)
{
    int *psocketFd = (int *)args;
    int socketFd = *psocketFd;
    while( 1 )
    {
        int                 connFd;
        struct sockaddr_in  clientAddr;
        socklen_t           length = sizeof(clientAddr);
        /**********************************************************
         *
         *  ACCEPT返回一个新的套接字与客户端进行通信
         *
         **********************************************************/        if((connFd = accept(socketFd, (struct sockaddr*)&clientAddr, &length)) < 0)
        {
            printf("accept error, errno = %d...\n", errno);
            continue;
        }
        else
        {
            printf("Pthread id = %ld, connfd = %d\n",
                    (unsigned long)pthread_self(), connFd);
            ////////////////////////////////////////////////////////////////////////
            //
            //  这里填写服务器的处理代码
            //
            ////////////////////////////////////////////////////////////////////////
            printf("\n\naccept connect from client %s\n",inet_ntoa(clientAddr.sin_addr));
            printf("获取到从客户端%s的连接...\n\n\n", inet_ntoa(clientAddr.sin_addr));
            pthread_t   childThread;
            arg_type    raise_args = {connFd, clientAddr};
            /*
             *  第一个参数为指向线程标识符的指针。
             *  第二个参数用来设置线程属性。
             *  第三个参数是线程运行函数的起始地址。
             *  最后一个参数是运行函数的参数。
            */            if(pthread_create(&childThread, NULL, RaiseThreadFunc, &raise_args) != 0)
            {
                perror("pthread_create error...\n");
                break;
            }
            else
            {
                printf("create raise pthread success...\n");
            }
        }
    }
    return NULL;
}
/* 服务器接收从客户端传送来的文件  */void
TcpServerPullFile(
            int         connFd,                     /*  服务器与客户端通讯的套接字文件  */            struct      sockaddr_in clientAddr,     /*  与之通信的客户端的信息  */            char        *fileServerRoot)            /*  上传文件的存储路径  */{
    char    buffer[BUFFER_SIZE];
    char    filename[MAX_FILENAME_SIZE];
    char    fileServerPath[MAX_FILENAME_SIZE]/* = fileServerRoot*/;
    // 定义文件流
    FILE    *stream;
    int     count;              /*  发送文件名的字节数目  */    int     dataLength;         /*  接收到的数据大小  */    int     writeLength;        /* 实际写入的数据大小  */    int     flag = 0;
    bzero(buffer, BUFFER_SIZE);
    /*
     *  向客户端提示输入文件路径提示...
     *
     * strcpy(buffer, "请输入要传输的文件的完整路径:");
    strcat(buffer, "\n");
    send(new_server_socket, buffer, BUFFER_SIZE, 0);
    bzero(buffer, BUFFER_SIZE);
    */    /*  首先获取客户端发送过来的文件名  */    count =  recv (connFd, buffer, BUFFER_SIZE, 0);
    if(count < 0)
    {
        perror("获取文件名失败...\n");
        exit(1);
    }
    else
    {
        strncpy(filename, buffer, strlen(buffer) > MAX_FILENAME_SIZE ? MAX_FILENAME_SIZE : strlen(buffer));
        strcpy(fileServerPath, fileServerRoot);
        strcat(fileServerPath, filename);
        printf("\n获取客户端发送过来的文件名成功...\n");
        printf("文件名[%s]\n", filename);
        printf("文件存储路径[%s]\n\n", fileServerPath);
    }
    //  服务器接受数据, 首先打开一个文件流
    if((stream = fopen(fileServerPath, "w")) == NULL)
    {
        perror("file open error...\n");
        exit(1);
    }
    else
    {
        bzero(buffer,BUFFER_SIZE);
    }
    printf("正在接收来自%s的文件....\n",inet_ntoa(clientAddr.sin_addr));
    dataLength = 0;
    /*  先将数据接受到缓冲区buffer中,再写入到新建的文件中  */    while((dataLength = recv(connFd, buffer, BUFFER_SIZE, 0)) > 0)
    {
        flag++;
        if(flag == 1)
        {
            printf("正在接收来自%s的文件....\n", inet_ntoa(clientAddr.sin_addr));
        }
        if(dataLength < 0)
        {
            printf("接收错误i\n");
            exit(1);
        }
        /*  向文件中写入数据  */        writeLength = fwrite(buffer, sizeof(char), dataLength, stream);
        if(writeLength != dataLength)
        {
             printf("file write failed\n");
             exit(1);
        }
        bzero(buffer,BUFFER_SIZE);
    }
    if(flag > 0)
    {
        printf("%s的文件传送完毕\n", inet_ntoa(clientAddr.sin_addr));
    }
    if(flag==0)
    {
        printf("%s的文件传输失败\n", inet_ntoa(clientAddr.sin_addr));
    }
    fclose(stream);
    //rename("data",inet_ntoa(clientAddr.sin_addr));
    ///  BUG   这里其实有问题
    ///  因为客户端将文件发送完毕后, 服务器是不知道的,
    ///  因此当客户端文件发送完毕后, 服务器会陷入一个死等的循环
    ///  这时一个问题, 但是不是我们代码的重点,
    ///  因为我们的代码, 只是用于学习套接字网络编程
    ///
    ///  这个BUG其实很好处理, 因此我们在网络传输的过程中
    ///  客户端与服务器通信的数据肯定有我们自己的格式或者规范
    ///  比如 [request/response HEAD + LENGTH + DATA]的格式
    ///  要不然连基本的UDP丢包 和 TCP粘包问题都解决不了
    ///  一般情况下, 我们与客户
}
/*  服务器将文件发送到客户端
 *
 *  当用户选择了下载文件后,服务器将执行此操作
 *
 *  */void TcpServerPushFile(
                    int         connFd,                  /*  服务器与客户端通讯的套接字文件  */                    struct      sockaddr_in clientAddr,  /*  与之通信的客户端的信息  */                    char        *filePath)              /*  带发送至客户端的文件路径  */{
    //send file imformation
    char    buff[BUFFER_SIZE];
    char    filename[MAX_FILENAME_SIZE];
    int     count;
    FILE    *stream;
    /* 先将文件名发送给客户端
     * 2015-4-13 21:38 Modify
     * 发送文件名时只需要发送filePath最后的文件名filename就可以了
     * */    bzero(buff, BUFFER_SIZE);
    strcpy(filename, strrchr(filePath, '/') + 1);
    strncpy(buff, filename, strlen(filename) > MAX_FILENAME_SIZE ? MAX_FILENAME_SIZE : strlen(filename));
    count = send(connFd, buff, BUFFER_SIZE, 0);
    printf("服务器待发送的文件名[%s]..\n", filename);
    if(count < 0)
    {
        perror("Send file information");
        exit(1);
    }
    /*  服务器开始读取并且发送文件 : */    if((stream = fopen(filePath, "rb")) == NULL)
    {
        printf("%s not found!\n", filePath);
    }
    printf("服务器打开文件成功...\n");
    printf("正在向客户端发送文件...\n");
    bzero(buff, BUFFER_SIZE);
    int fileBlockLength = 0;
    while((fileBlockLength = fread(buff, sizeof(char), BUFFER_SIZE, stream)) > 0)
    {
        printf("读取了:%d个数据...\n",fileBlockLength);
        if((count =send(connFd, buff, fileBlockLength, 0)) < 0)
        {
            perror("Send file error...\n");
            perror("向客户端发送文件失败...\n");
            exit(1);
        }
        bzero(buff,BUFFER_SIZE);
    }
    fclose(stream);
    printf("服务器发送文件成功\n");
}
void* RaiseThreadFunc(void *args)
{
    arg_type            *arg        = (arg_type *)args;
    int                 connFd      = arg->connFd;
    struct sockaddr_in  clientAddr  = arg->clientAddr;
#ifdef DEBUG
    printf("==DEBUG== connfd = %d, client = %s\n", connFd,  inet_ntoa(clientAddr.sin_addr));
#endif
    RaiseClientRequest(connFd, clientAddr);
    return NULL;
}
/// 处理客户端的请求信息
void RaiseClientRequest(
        int connFd,     /*  客户端的连接套接字描述符, 用于发送和接收数据  */        struct sockaddr_in  clientAddr) /*  客户端的信息, 用于显示一些客户端的信息  */{
#ifdef DEBUG
    printf("==DEBUG== connfd = %d, client = %s\n", connFd,  inet_ntoa(clientAddr.sin_addr));
#endif
    printf("\n\n\n下面将依次测试  接收数据  发送数据  存储文件  推送文件\n\n\n");
    int count;
    char buffer[BUFFER_SIZE];
    //  首先测试接收客户端发送来的数据
    printf("===========recv data===========\n");
    bzero(buffer, BUFFER_SIZE);
    if((count = recv(connFd, buffer, BUFFER_SIZE, 0)) < 0)
    {
        printf("recv data error from %s error, errno = %d...\n", inet_ntoa(clientAddr.sin_addr), errno);
        printf("接收来自 %s 的数据错误, 错误码errno = %d....\n", inet_ntoa(clientAddr.sin_addr), errno);
    }
    else
    {
        printf("recv %d data : %s\n", count, buffer);
        printf("接收%d个数据 : %s\n", count, buffer);
    }
    printf("===========recv data===========\n\n\n");
    //  接着测试向客户端发送反馈数据
    printf("===========send data===========\n");
    bzero(buffer, BUFFER_SIZE);
    strcpy(buffer, "I am fine !");
    if((count = send(connFd, buffer, strlen(buffer) + 1, 0)) < 0)
    {
        printf("send data[%s] error[errno = %d]...\n", buffer, errno);
        printf("发送数据[%s] 失败[错误码 = %d]...\n", buffer, errno);
    }
    else
    {
        printf("send data[%s] success...\n", buffer);
        printf("发送数据[%s]成功...\n", buffer);
    }
    printf("===========send data===========\n\n\n");
    //  首先测试接收客户端发送来的数据
    printf("===========pull file============\n");
    TcpServerPullFile(connFd, clientAddr, "./sdata/"); /*  将客户端发送来的文件存储在./sdata目录下  */    printf("===========pull file============\n");
    //  首先测试接收客户端发送来的数据
    //printf("===========pull file============\n");
    //TcpServerPushFile(connFd, clientAddr, "./sdata/spush"); /*  将客户端发送来的文件存储在./sdata目录下  */    //printf("===========pull file============\n");
}
//  处理子进程退出的信号处理函数
void SignalChild(int signo)         //父进程对子进程结束的信号处理
{
    pid_t       pid;
    int         stat;
    //  SIGCHLD 20,17,18 B 子进程结束信号
    printf("get a signal %d\n", signo);
    while((pid = waitpid(-1, &stat, WNOHANG)) > 0)
    {
        printf("child %d terminated\n", pid);
        printf("子进程%d终止\n", pid);
    }
}  

客户端

 #include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <errno.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <signal.h>
#include <pthread.h>
#define TCP_SERVER_PORT     6666
#define MAX_FILENAME_SIZE   256
#define IP_SIZE             20
#define BUFFER_SIZE         4096
#define MAX_CLIENT_COUNT    1000   //  模拟1000个客户端
pthread_mutex_t mut         =   PTHREAD_MUTEX_INITIALIZER;   //  互斥量
pthread_cond_t  cond        =   PTHREAD_COND_INITIALIZER;    //  条件变量
int             condValue   =   1;
extern int errno;
//  客户端进行的处理逻辑
void RaiseServerResponse(int socketFd);
/* 从服务器上下载文件  */void TcpClientPullFile(int socketFd, char *filePath);
/* 客户端将文件上传到服务器上 */void TcpClientPushFile(int socketFd, char *filePath);
//  客户端线程的处理函数
void* ClientThreadFunc(void *args);
int main(int argc, char *argv[])
{
    char                serverIp[IP_SIZE];              /*  服务器的IP地址          */    if(argc >= 2)                       /*  参数过多的时候,提示用户                */    {
        printf("You have given to much parameters...\n");
        printf("Yous should give the IP address after %s\n without any other parametes...\n", (char *)argv[0]);
    }
    else if(argc == 1)                  /*  只有一个参数,则默认使用localhost(127.0.0.1)  */    {
        strcpy(serverIp, "127.0.0.1");
    }
    else
    {
        strcpy(serverIp, argv[1]);
    }
    pthread_t client[MAX_CLIENT_COUNT];
    for(int threadPos = 0;
        threadPos < MAX_CLIENT_COUNT;
        threadPos++)
    {
        if(pthread_create(&client[threadPos], NULL, ClientThreadFunc, serverIp) != 0)
        {
            perror("create client thread error...\n");
        }
    }
    condValue = 2;
    while(1)
    {
        sleep(2);
        pthread_cond_broadcast(&cond);  //条件满足后发信号通知所有阻塞在条件变量上的线程!
    }
    return EXIT_SUCCESS;
}
void* ClientThreadFunc(void *args)
{
    char *serverIp = (char *)args;
    struct sockaddr_in  serverAddr;         
    int                 socketFd;                    
    bzero(&serverAddr, sizeof(serverAddr));             /*  全部置零               */    serverAddr.sin_family       =   AF_INET;                      /*  internet协议族         */    serverAddr.sin_addr.s_addr  =   inet_addr(serverIp);        /*  设置所连接服务器的IP   */    serverAddr.sin_port         =   htons(TCP_SERVER_PORT);       /*  设置连接的服务器端口   */    /*  开始创建套接字                        */    /*  SOCK_STREAM 面向连接的套接字,即TCP   */    socketFd                    =   socket(AF_INET, SOCK_STREAM, 0);
    if(socketFd < 0)
    {
        printf("socket error\n");
        exit(-1);
    }
    printf("Wait for connect...\n");
    pthread_cond_wait(&cond, &mut);
    pthread_mutex_unlock(&mut);
    //  信号会丢失,使得这里永远醒不了,所以需要重发信号.
    //  直到在客户端的主函数中调用pthread_cond_broadcast(&cond);
    /*  尝试连接服务器  */    if(connect(socketFd, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) < 0)
    {
        printf("Can Not Connect To %s\n", serverIp);
        exit(1);
    }
    else
    {
        printf("connect to the server %s SUCCESS...\n", serverIp);
        printf("连接服务器成功...\n");
    }
    /**********************************************************
     *
     *  下面进行正常的套接字通信
     *
     **********************************************************/    RaiseServerResponse(socketFd);
    //  关闭套接字的文件描述符
    close(socketFd);
    return EXIT_SUCCESS;
}
/* 客户端将文件上传到服务器上 */void TcpClientPushFile(int socketFd, char *filePath)
{
    FILE    *stream;
    char    buffer[BUFFER_SIZE];
    char    filename[MAX_FILENAME_SIZE];
    int     count = 0;
    bzero(buffer, BUFFER_SIZE);
    strcpy(filename, strrchr(filePath, '/') + 1);
    strncpy(buffer, filename, strlen(filename) > MAX_FILENAME_SIZE ? MAX_FILENAME_SIZE : strlen(filename));
    if((count = send(socketFd, buffer, BUFFER_SIZE, 0)) < 0)
    {
        perror("Send file information");
        exit(1);
    }
    printf("客户端待上传待文件名[%s]..\n", filename);
    /*  打开文件流  */    if((stream = fopen(filePath, "r")) == NULL)
    {
        printf("Can't open the file [%s], errno = %d\n", filePath, errno);
        exit(-1);
    }
    else
    {
        printf("客户端打开文件成功\n");
    }
    printf("正在向服务器传上传文件...\n");
    count = 0;
    /*  清空缓冲区  */    bzero(buffer, BUFFER_SIZE);
    /*  不断读取并发送数据  */    while((count = fread(buffer, 1, BUFFER_SIZE, stream)) > 0)
    {
        // printf("count =%d\n", count);
        if(send(socketFd, buffer, count, 0) < 0)
        {
            printf("send file error...\n");
            break;
        }
        bzero(buffer, BUFFER_SIZE);  /*  再次将缓冲区清空  */    }
    printf("向服务器发送文件成功...\n");
    /* 传送完毕后, 关闭文件流  */    if(fclose(stream))
    {
        printf("file close error\n");
        exit(1);
    }
    else
    {
        printf("关闭文件流成功...\n");
    }
    ///  BUG   这里其实有问题
    ///  因为客户端将文件发送完毕后, 服务器是不知道的,
    ///  因此当客户端文件发送完毕后, 服务器会陷入一个死等的循环
    ///  这时一个问题, 但是不是我们代码的重点,
    ///  因为我们的代码, 只是用于学习套接字网络编程
    ///
    ///  这个BUG其实很好处理, 因此我们在网络传输的过程中
    ///  客户端与服务器通信的数据肯定有我们自己的格式或者规范
    ///  比如 [request/response HEAD + LENGTH + DATA]的格式
    ///  要不然连基本的UDP丢包 和 TCP粘包问题都解决不了
    ///  一般情况下, 我们与客户
    /*  关闭与服务器通讯的套接字  */    //close(socketFd);
}
/* 从服务器上下载文件  */void TcpClientPullFile(int socketFd, char *filePath)
{
    char    buff[BUFFER_SIZE];
    char    filename[MAX_FILENAME_SIZE];
    int     count, writeLength, dataLength;
    FILE    *stream;
    bzero(buff,BUFFER_SIZE);
    /*  首先获取服务器发送过来的文件名  */    if((count = recv(socketFd, buff, BUFFER_SIZE, 0)) < 0)
    {
        perror("获取文件名失败...\n");
        exit(1);
    }
    strncpy(filename, buff, strlen(buff) > MAX_FILENAME_SIZE ? MAX_FILENAME_SIZE : strlen(buff));
    /*  开始接收文件  */    printf("Preparing download file : %s", filename);
    /*  打开文件流  */    if((stream = fopen(filename, "wb+")) == NULL)
    {
        perror("create file %s error...\n");
        perror("创建文件失败...\n");
        exit(1);
    }
    bzero(buff, BUFFER_SIZE);          /*  清空缓冲区  */    dataLength = 0;
    while((dataLength = recv(socketFd, buff, BUFFER_SIZE, 0)) != 0)
    {
        if(dataLength < 0)  /* 如果接收文件失败  */        {
            perror("download error...\n");
            perror("下载文件失败...\n");
            exit(1);
        }
        /*  将接收到的文件数据写入文件中  */        writeLength = fwrite(buff, sizeof(char), dataLength, stream);
        if(writeLength < dataLength)   /*  如果写入的数据比实际接收到的数据少  */        {
            perror("file write error...\n");
            perror("写入文件失败...\n");
            exit(1);
        }
        bzero(buff, BUFFER_SIZE);               /* 清空缓冲区  */    }
    printf("下载来自服务器%s的文件成功\n", filename);
    printf("Receieved file:%s finished!\n", filename);
    fclose(stream);             /*  关闭文件流 */}
//  客户端进行的处理逻辑
void RaiseServerResponse(int socketFd)
{
    char                buffer[BUFFER_SIZE];            /*  数据缓冲区              */    int                 count;                          /*  接受或者发送的数据大小  */    printf("\n\n\n下面将依次测试  发送数据  接收数据  上传文件  下载文件\n\n\n");
    //  发送数据流
    printf("===========send data===========\n");
    bzero(buffer, BUFFER_SIZE);
    strcpy(buffer, "How are you ?");
    if((count = send(socketFd, buffer, strlen(buffer) + 1, 0)) < 0)
    {
        printf("send data[%s] error[errno = %d]...\n", buffer, errno);
        printf("发送数据[%s] 失败[错误码 = %d]...\n", buffer, errno);
    }
    else
    {
        printf("send data[%s] success...\n", buffer);
        printf("发送数据[%s]成功...\n", buffer);
    }
    printf("===========send data===========\n\n\n");
    //  接受数据流
    printf("===========recv data===========\n");
    bzero(buffer, BUFFER_SIZE);
    if((count = recv(socketFd, buffer, BUFFER_SIZE, 0)) < 0)
    {
        printf("recv data error[errno = %d]...\n", errno);
        printf("接收数据失败[错误码 = %d]...\n", errno);
    }
    else
    {
        printf("recv %d data : %s\n", count, buffer);
        printf("接收%d个数据 : %s\n", count, buffer);
    }
    printf("===========recv data===========\n\n\n");
    // 上传文件到服务器
    printf("===========push file===========\n");
    TcpClientPushFile(socketFd, "./cdata/cpush");
    printf("===========push file===========\n\n\n");
    // 上传文件到服务器
    //printf("===========push file===========\n");
    //TcpClientPullFile(socketFd, "./cdata/");
    //printf("===========push file===========\n\n\n");
    //close(socketFd);
}  

运行结果

需要C/C++ Linux服务器架构师学习资料私信“资料”(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享

相关文章