/*************************************************************************** Filename: rtsig_echoserver.cpp Title: Linux RT Signal concurrent server programming simple example Description: RT Signal server which handles multiple clients and echoing messages. Tested with linux kernel 2.4.5, glibc 2.2.2 Author: Lee, Hongki Date: 2001-06-12 Requirement : Linux RT Signal(>kernel 2.2.x) signal-per-fd patch : this implemention is not yet complete tested. (do not use this time, leave it undefined __USE_ONESIGFD.) visit this, (for korean) http://www.chonga.pe.kr/document/linux/highperfsvr/index.html and visit this, (it's an inital documentation) http://www.mail-archive.com/linux-kernel@vger.kernel.org/msg45354.html Misc: tab size is 4 Rights: for FREE, for your honor ****************************************************************************/ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define MAX_SOCKET 1000 //#define MAX_SOCKET 5000 #undef FD_SETSIZE #define FD_SETSIZE MAX_SOCKET #define PORT 50071 //#define __USE_ONESIGFD void setup_sigio(int fd); ///////////////////////////////////////////////////////////////////// // BufferQueue // :: not yet optimized (eg. memory copying) class CBufQueue { private: unsigned char *m_pbBuffer; int m_iRear, m_iFront; int m_iMaxBufCount, m_iCount; public: void clear() { // ÃʱâÈ­ m_iRear=m_iFront=-1; m_iCount=0; } void init(int iMaxBuf) { m_iMaxBufCount=iMaxBuf; // 4092BYTE clear(); m_pbBuffer = new unsigned char[m_iMaxBufCount]; } CBufQueue() { init(4096); } CBufQueue(int iMaxBuf) { init(iMaxBuf); } ~CBufQueue() { delete m_pbBuffer; } bool push(unsigned char *pbBuf,int iSize) ; int pop(unsigned char *pbBuf,int iSize) ; int query(unsigned char *pbBuf,int iSize) ; int getSize(); }; bool CBufQueue::push(unsigned char *pbBuf,int iSize) { if ((m_iMaxBufCount-m_iCount)clear(); sQueue->clear(); } CFDSet() { rQueue = new CBufQueue(512); sQueue = new CBufQueue(512); } ~CFDSet() { delete rQueue; delete sQueue; } }; CFDSet g_fdlist[MAX_SOCKET]; static int g_rtsig=SIGRTMIN; volatile int g_count; int g_listenfd; static char * code2string(int si_code) { switch (si_code) { case (SI_USER): return "SI_USER"; case (SI_QUEUE): return "SI_QUEUE"; case (SI_TIMER): return "SI_TIMER"; case (SI_MESGQ): return "SI_MESGQ"; case (SI_ASYNCIO): return "SI_ASYNCIO"; case (SI_SIGIO): return "SI_SIGIO"; case (POLL_IN): return "POLL_IN"; case (POLL_OUT): return "POLL_OUT"; case (POLL_MSG): return "POLL_MSG"; case (POLL_ERR): return "POLL_ERR"; case (POLL_PRI): return "POLL_PRI"; case (POLL_HUP): return "POLL_HUP"; default: return "unknown"; } } // socket recv/send int sendbufdata(int iSocketID, const void* mesg, const int sizen, int &sentn) { int writen, leftn; leftn=sizen; //int pos=0; //for other compiler while (leftn>0) { if ( (writen=send(iSocketID,mesg,leftn,0))<=0) { // EAGAIN󸮵µ ÇØ¾ßÇÔ. if (errno==EAGAIN) { // EWOULDBLOCK puts("send has EAGAIN"); } else { printf("send has other error : %d \n",errno); } sentn = (sizen-leftn); return -(EAGAIN); // À½¼ö·Î Ç¥Çö } leftn -= writen; (unsigned char*)mesg += writen; // pos += writen; //for other compiler } sentn = (sizen-leftn); return sentn; } int recvbufdata(int iSocketID, void *mesg, int sizen, int &recvn) { //puts("recvBufData start"); int readn, leftn; leftn=sizen; while (leftn>0) { if ( (readn=recv(iSocketID,mesg,leftn,0))<0) { recvn = (sizen-leftn); return -(errno); } else if (readn==0) { printf("[E] TCPSock tcp_recv: 0 packet recved. \n") ; break; } leftn -= readn; (unsigned char*)mesg += readn; } recvn = sizen-leftn; return recvn; } // client connection handling int new_conn(int fd) { int newfd; struct sockaddr_in sin; int optlen = sizeof(sin); while ((newfd = accept(fd, (struct sockaddr *)&sin, (socklen_t*)&optlen)) < 0) { if (errno == EINTR) continue; else { //if ((errno == EWOULDBLOCK) || (errno == EAGAIN)) { if (errno == EAGAIN) { // Linux¿¡¼­´Â EWOULDBLOCK°ú °°´Ù. puts("nothing to accept"); return -1; } else { printf( "new_conn: error accepting new connection. (%d)\n", errno); return -1; } } } return newfd; } int close_conn(int fd) { g_count--; puts("closing fd and remove async"); fcntl(fd, F_SETFL, O_RDWR); // remove async g_fdlist[fd].fd = -1; g_fdlist[fd].clearQueue(); close(fd); return 1; } int do_read(int fd) { CFDSet* pfs=&g_fdlist[fd]; unsigned char buf[1024]; int sizen=0; ioctl(pfs->fd, FIONREAD,&sizen); if (sizen==0) { puts("do_read: ¹öÆÛ¿¡ 0"); return 0; } // ÀÌ·± °æ¿ì°¡ ¹ß»ýÇϴ°¡? bzero(buf,1024); int retsize=0; int ret=recvbufdata(pfs->fd,(void *)buf, sizen, retsize); if (ret<=0) { if (ret== -(EAGAIN) ) { // EAGAINÀÌÁö¸¸.. if (pfs->rQueue->push(buf,retsize)==false) { puts("rQueue ÀúÀå½ÇÆÐ"); } puts("RECV WOULD BLOCK ¹Þ¾Æ¼­ ³Ñ¾î°¨"); return 0; } else { puts("rcv error"); return -1; } } else { if (pfs->rQueue->push(buf,retsize)==false) { puts("rQueue ÀúÀå½ÇÆÐ2"); } } return retsize; } int do_write(int fd) { CFDSet* pfs=&g_fdlist[fd]; unsigned char buf[1024]; bzero(buf,1024); int sizen=pfs->sQueue->query(buf,pfs->sQueue->getSize()); int retsize=0; int ret=sendbufdata(pfs->fd,(void *)buf, sizen,retsize); if (ret<0) { if (ret!= -(EAGAIN)) { return ret; //return retsize; } else { puts("SEND WOULDBLOCK "); } } printf("Sent %s\n",(char*)buf); if (pfs->sQueue->pop(buf,retsize)>0) { // ¿©±â¼­ ´Ü¼øÈ÷ Áö¿ì´Â ±¸¹®µµ ¸¸µéÀÚ. puts("send bufqueue¿¡¼­ Á¦°Å ¼º°ø"); } return retsize; } void handle(siginfo_t *si) { int fd=si->si_fd; int revent = si->si_code; int revent2 = si->si_band; printf("fd is %d and si_code %d, %s, si_band %d\n",fd,revent, code2string(revent), revent2); int newclientfd=-1; if (g_listenfd==fd) { if (revent==POLL_IN) { puts("LISTEN ACCEPT!!!!"); newclientfd=new_conn(fd); if (newclientfd>0) { g_fdlist[newclientfd].fd = newclientfd; setup_sigio(newclientfd); g_count++; printf("!!!! %d clients:new client added and setup sigio queue added\n",g_count); // Á¢¼ÓÇÏÀÚ¸¶ÀÚ, client°¡ µ¥ÀÌÅ͸¦ ³¯·È´ÂÁö °Ë»çÇØ¼­ ¾Æ·¡·Î ³Ñ±ä´Ù. int sizen=0; ioctl(newclientfd, FIONREAD,&sizen); if (sizen>0) { fd=newclientfd; revent=POLL_IN;} // ÀÌ·± °æ¿ì°¡ ¹ß»ýÇϴ°¡? } } } if (g_listenfd!=fd) { switch(revent) { case POLL_IN: { char data[1024]; int retsize=0; if ((retsize=do_read(fd))<0) { close_conn(fd); } else { bzero(data,1024); g_fdlist[fd].rQueue->query((unsigned char*)data,retsize); printf("¹ÞÀºµ¥ÀÌÅÍ[%d]: %s",retsize,data); } // ÀÏ´Ü ECHOŬ¶óÀ̾ðÆ®À̹ǷΠsend buff¿¡ ³Ö¾îÁÜ if (g_fdlist[fd].rQueue->pop((unsigned char*)data,retsize)>0) { if (g_fdlist[fd].sQueue->push((unsigned char*)data,retsize)==true) puts("echoing success queue transfer"); } do_write(fd); break; } case POLL_OUT: // 󸮵ÇÁö ¾ÊÀº Àü¼Ûó¸® do_write(fd); case POLL_ERR: default: { // fd is 4 and si_code 4, POLL_ERR, si_band 8 close_conn(fd); break; } } // switch } puts("do something signal recved. something!!"); return ; } void do_sigio_overflow(int a) { puts ("kernel detected overflows"); // signal-per-fd¿¡¼­´Â ÀϾÁö ¾Ê´Â´Ù°í ÇÑ´Ù. return ; } void* do_signal_loop(void*args) { puts("thread startup"); sigset_t signals; siginfo_t siginfo; int signum, sd; // Block the RT signal sigemptyset(&signals); sigaddset(&signals, SIGRTMIN); sigprocmask(SIG_BLOCK, &signals, 0); while (1) { // Dequeue a signal from the signal queue puts("sigwaitinfo before"); signum = sigwaitinfo(&signals, &siginfo); puts("sigwaitinfo after"); // Check if the signal is an RT signal if (signum == SIGRTMIN) { puts("handled signal3 SIGRTMIN"); // Identify the socket associated with the signal handle(&siginfo); } puts("sigwaitinfo loop done"); } } void do_exit(int a) { puts("catched exit"); exit(-1); return; } // setup signals // void setup_sigio(int fd) { // Associate an RT signal // with the new socket fcntl(fd, F_SETOWN, getpid()); fcntl(fd, F_SETSIG, SIGRTMIN); fcntl(fd, F_SETFL, O_RDWR|O_NONBLOCK|FASYNC); // ASYNC IO, NON BLOCK ¼³Á¤, BSD ȣȯ // FASYNC == O_ASYNC #ifdef __USE_ONESIGFD fcntl(fd,F_SETAUXFL,O_ONESIGFD); // signal-per-fd patched #endif return; } void setup_signal_handler(void) { struct sigaction sigact; sigemptyset(&sigact.sa_mask); sigact.sa_flags = SA_SIGINFO; sigact.sa_restorer = NULL; sigact.sa_handler = do_exit; if (sigaction(SIGINT, &sigact, NULL) < 0) { exit(1); } if (sigaction(SIGTERM, &sigact, NULL) < 0) { exit(1); } sigact.sa_handler = SIG_IGN; if (sigaction(SIGPIPE, &sigact, NULL) < 0) { exit(1); } sigact.sa_handler = do_sigio_overflow; if (sigaction(SIGIO, &sigact, NULL) < 0) { exit(1); } puts("setup signaled"); return; } int main() { /* buf queue test CBufQueue cBQ; unsigned char data[4096]; int r=0; if (cBQ.push((unsigned char*)"1234567890ABCDEFG",17)==true) { printf ("Áý¾î³Ö¾ú´Ù. q, size, %d\n", cBQ.getSize()); } else { puts("Áý¾î³Ö±â½ÇÆÐ´Ù."); } bzero(data,4096); if ((r=cBQ.pop(data,10))!=0) { printf("%d, %s °¡Á®¿È.\n",r,data); } else { puts("°¡Á®¿Ã°Ô¾ø´Ù."); } bzero(data,4096); if ((r=cBQ.pop(data,10))!=0) { printf("%d, %s °¡Á®¿È.\n",r,data); } else { puts("°¡Á®¿Ã°Ô¾ø´Ù."); } printf ("q, size %d, %d\n",0, cBQ.getSize()); exit(-1); */ // resource set struct rlimit rlim; rlim.rlim_cur = rlim.rlim_max = MAX_SOCKET; if(setrlimit(RLIMIT_NOFILE,&rlim)) { puts("error set max fds"); exit(1); } // setup system wide sig handler setup_signal_handler(); socklen_t clilen; struct sockaddr_in cliaddr, servaddr; int listenfd = socket(AF_INET, SOCK_STREAM, 0); if (listenfd <0 ) { puts("listen failed"); exit(-1); } bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(PORT); int one=1; setsockopt( listenfd, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof( one ) ); if ((bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr)))<0) { puts("bind err"); exit(0); } if (listen(listenfd, 256)<0) { // backlog 256 puts("listen faild"); exit(0); } g_count=0; setup_sigio(listenfd); g_listenfd = listenfd; do_signal_loop(0); // ÀÌÂÊÀ¸·Î´Â ³ª¿ÃÀÏ ¾ø´Ù. return 1; }