connection.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # filename: connection.py
  4. import socket
  5. import os
  6. import sys
  7. import time
  8. import random
  9. from itertools import chain
  10. from fdfs_client.exceptions import (
  11. FDFSError,
  12. ConnectionError,
  13. ResponseError,
  14. InvaildResponse,
  15. DataError
  16. )
  17. # start class Connection
  18. class Connection(object):
  19. """Manage TCP comunication to and from Fastdfs Server."""
  20. def __init__(self, **conn_kwargs):
  21. self.pid = os.getpid()
  22. self.host_tuple = conn_kwargs['host_tuple']
  23. self.remote_port = conn_kwargs['port']
  24. self.remote_addr = None
  25. self.timeout = conn_kwargs['timeout']
  26. self._sock = None
  27. def __del__(self):
  28. try:
  29. self.disconnect()
  30. except:
  31. pass
  32. def connect(self):
  33. """Connect to fdfs server."""
  34. if self._sock:
  35. return
  36. try:
  37. sock = self._connect()
  38. except socket.error as e:
  39. raise ConnectionError(self._errormessage(e))
  40. self._sock = sock
  41. #print '[+] Create a connection success.'
  42. #print '\tLocal address is %s:%s.' % self._sock.getsockname()
  43. #print '\tRemote address is %s:%s' % (self.remote_addr, self.remote_port)
  44. def _connect(self):
  45. '''Create TCP socket. The host is random one of host_tuple.'''
  46. self.remote_addr = random.choice(self.host_tuple)
  47. #print '[+] Connecting... remote: %s:%s' % (self.remote_addr, self.remote_port)
  48. #sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  49. #sock.settimeout(self.timeout)
  50. sock = socket.create_connection((self.remote_addr, self.remote_port),self.timeout)
  51. return sock
  52. def disconnect(self):
  53. """Disconnect from fdfs server."""
  54. if self._sock is None:
  55. return
  56. try:
  57. self._sock.close()
  58. except socket.error as e:
  59. raise ConnectionError(self._errormessage(e))
  60. self._sock = None
  61. def get_sock(self):
  62. return self._sock
  63. def _errormessage(self, exception):
  64. # args for socket.error can either be (errno, "message")
  65. # or just "message" """
  66. if len(exception.args) == 1:
  67. return "[-] Error: connect to %s:%s. %s." % \
  68. (self.remote_addr, self.remote_port, exception.args[0])
  69. else:
  70. return "[-] Error: %s connect to %s:%s. %s." % \
  71. (exception.args[0], self.remote_addr, self.remote_port, exception.args[1])
  72. # end class Connection
  73. # start ConnectionPool
  74. class ConnectionPool(object):
  75. """Generic Connection Pool"""
  76. def __init__(self, name='', conn_class=Connection,
  77. max_conn=None, **conn_kwargs):
  78. self.pool_name = name
  79. self.pid = os.getpid()
  80. self.conn_class = conn_class
  81. self.max_conn = max_conn or 2 ** 31
  82. self.conn_kwargs = conn_kwargs
  83. self._conns_created = 0
  84. self._conns_available = []
  85. self._conns_inuse = set()
  86. # print '[+] Create a connection pool success, name: %s.' % self.pool_name
  87. def _check_pid(self):
  88. if self.pid != os.getpid():
  89. self.destroy()
  90. self.__init__(self.conn_class, self.max_conn, **self.conn_kwargs)
  91. def make_conn(self):
  92. """Create a new connection."""
  93. if self._conns_created >= self.max_conn:
  94. raise ConnectionError('[-] Error: Too many connections.')
  95. num_try = 10
  96. while True:
  97. try:
  98. if num_try <= 0:
  99. sys.exit()
  100. conn_instance = self.conn_class(**self.conn_kwargs)
  101. conn_instance.connect()
  102. self._conns_created += 1
  103. break
  104. except ConnectionError as e:
  105. print(e)
  106. num_try -= 1
  107. conn_instance = None
  108. return conn_instance
  109. def get_connection(self):
  110. """Get a connection from pool."""
  111. self._check_pid()
  112. try:
  113. conn = self._conns_available.pop()
  114. # print '[+] Get a connection from pool %s.' % self.pool_name
  115. # print '\tLocal address is %s:%s.' % conn._sock.getsockname()
  116. # print '\tRemote address is %s:%s' % (conn.remote_addr, conn.remote_port)
  117. except IndexError:
  118. conn = self.make_conn()
  119. self._conns_inuse.add(conn)
  120. return conn
  121. def remove(self, conn):
  122. """Remove connection from pool."""
  123. if conn in self._conns_inuse:
  124. self._conns_inuse.remove(conn)
  125. self._conns_created -= 1
  126. if conn in self._conns_available:
  127. self._conns_available.remove(conn)
  128. self._conns_created -= 1
  129. def destroy(self):
  130. """Disconnect all connections in the pool."""
  131. all_conns = chain(self._conns_inuse, self._conns_available)
  132. for conn in all_conns:
  133. conn.disconnect()
  134. # print '[-] Destroy connection pool %s.' % self.pool_name
  135. def release(self, conn):
  136. """Release the connection back to the pool."""
  137. self._check_pid()
  138. if conn.pid == self.pid:
  139. self._conns_inuse.remove(conn)
  140. self._conns_available.append(conn)
  141. # print '[-] Release connection back to pool %s.' % self.pool_name
  142. # end ConnectionPool class
  143. def tcp_recv_response(conn, bytes_size, buffer_size = 1024):
  144. '''Receive response from server.
  145. It is not include tracker header.
  146. arguments:
  147. @conn: connection
  148. @bytes_size: int, will be received byte_stream size
  149. @buffer_size: int, receive buffer size
  150. @Return: tuple,(response, received_size)
  151. '''
  152. response = ''
  153. total_size = 0
  154. total_bytes_size = bytes_size
  155. try:
  156. while 1:
  157. if total_bytes_size - total_size <= buffer_size:
  158. resp = conn._sock.recv(buffer_size)
  159. response += resp
  160. total_size += len(resp)
  161. break
  162. resp = conn._sock.recv(buffer_size)
  163. response += resp
  164. total_size += len(resp)
  165. except (socket.error, socket.timeout), e:
  166. raise ConnectionError('[-] Error: while reading from socket: (%s)' \
  167. % e.args)
  168. return (response, total_size)
  169. def tcp_send_data(conn, bytes_stream):
  170. """Send buffer to server.
  171. It is not include tracker header.
  172. arguments:
  173. @conn: connection
  174. @bytes_stream: trasmit buffer
  175. @Return bool
  176. """
  177. try:
  178. conn._sock.sendall(bytes_stream)
  179. except (socket.error, socket.timeout) as e:
  180. raise ConnectionError('[-] Error: while writting to socket: (%s)' \
  181. % e.args)