connection.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # filename: connection.py
  4. import socket
  5. import os
  6. import sys
  7. import time
  8. import random
  9. from itertools import chain
  10. from fdfs_client.exceptions import (
  11. FDFSError,
  12. ConnectionError,
  13. ResponseError,
  14. InvaildResponse,
  15. DataError
  16. )
  17. # start class Connection
  18. class Connection(object):
  19. """Manage TCP comunication to and from Fastdfs Server."""
  20. def __init__(self, **conn_kwargs):
  21. self.pid = os.getpid()
  22. self.host_tuple = conn_kwargs['host_tuple']
  23. self.remote_port = None
  24. self.remote_addr = None
  25. self.timeout = conn_kwargs['timeout']
  26. self._sock = None
  27. def __del__(self):
  28. try:
  29. self.disconnect()
  30. except:
  31. pass
  32. def connect(self):
  33. """Connect to fdfs server."""
  34. if self._sock:
  35. return
  36. try:
  37. sock = self._connect()
  38. except socket.error as e:
  39. raise ConnectionError(self._errormessage(e))
  40. self._sock = sock
  41. # print '[+] Create a connection success.'
  42. # print '\tLocal address is %s:%s.' % self._sock.getsockname()
  43. # print '\tRemote address is %s:%s' % (self.remote_addr, self.remote_port)
  44. def sendall(self, msg):
  45. if not self._sock:
  46. self.connect()
  47. self._sock.sendall(msg)
  48. def recv(self, len):
  49. return self._sock.recv(len)
  50. def _connect(self):
  51. """Create TCP socket. The host is random one of host_tuple."""
  52. self.remote_addr, self.remote_port = random.choice(self.host_tuple)
  53. # print '[+] Connecting... remote: %s:%s' % (self.remote_addr, self.remote_port)
  54. # sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  55. # sock.settimeout(self.timeout)
  56. sock = socket.create_connection((self.remote_addr, self.remote_port), self.timeout)
  57. return sock
  58. def disconnect(self):
  59. """Disconnect from fdfs server."""
  60. if self._sock is None:
  61. return
  62. try:
  63. self._sock.close()
  64. except socket.error as e:
  65. pass
  66. self._sock = None
  67. def get_sock(self):
  68. return self._sock
  69. def _errormessage(self, exception):
  70. # args for socket.error can either be (errno, "message")
  71. # or just "message" """
  72. if len(exception.args) == 1:
  73. return "[-] Error: connect to %s:%s. %s." % \
  74. (self.remote_addr, self.remote_port, exception.args[0])
  75. else:
  76. return "[-] Error: %s connect to %s:%s. %s." % \
  77. (exception.args[0], self.remote_addr, self.remote_port, exception.args[1])
  78. # end class Connection
  79. # start ConnectionPool
  80. class ConnectionPool(object):
  81. """Generic Connection Pool"""
  82. def __init__(self, name='', conn_class=Connection,
  83. max_conn=None, **conn_kwargs):
  84. self.pool_name = name
  85. self.pid = os.getpid()
  86. self.conn_class = conn_class
  87. self.max_conn = max_conn or 2 ** 31
  88. self.conn_kwargs = conn_kwargs
  89. self._conns_created = 0
  90. self._conns_available = []
  91. self._conns_inuse = set()
  92. # print '[+] Create a connection pool success, name: %s.' % self.pool_name
  93. def _check_pid(self):
  94. if self.pid != os.getpid():
  95. self.destroy()
  96. self.__init__(self.pool_name, self.conn_class, self.max_conn, **self.conn_kwargs)
  97. def make_conn(self):
  98. """Create a new connection."""
  99. if self._conns_created >= self.max_conn:
  100. raise ConnectionError('[-] Error: Too many connections.')
  101. num_try = 10
  102. while True:
  103. try:
  104. if num_try <= 0:
  105. sys.exit()
  106. conn_instance = self.conn_class(**self.conn_kwargs)
  107. conn_instance.connect()
  108. self._conns_created += 1
  109. break
  110. except ConnectionError as e:
  111. print(e)
  112. num_try -= 1
  113. conn_instance = None
  114. return conn_instance
  115. def get_connection(self):
  116. """Get a connection from pool."""
  117. self._check_pid()
  118. try:
  119. conn = self._conns_available.pop()
  120. # print '[+] Get a connection from pool %s.' % self.pool_name
  121. # print '\tLocal address is %s:%s.' % conn._sock.getsockname()
  122. # print '\tRemote address is %s:%s' % (conn.remote_addr, conn.remote_port)
  123. except IndexError:
  124. conn = self.make_conn()
  125. self._conns_inuse.add(conn)
  126. return conn
  127. def remove(self, conn):
  128. """Remove connection from pool."""
  129. if conn in self._conns_inuse:
  130. self._conns_inuse.remove(conn)
  131. self._conns_created -= 1
  132. if conn in self._conns_available:
  133. self._conns_available.remove(conn)
  134. self._conns_created -= 1
  135. def destroy(self):
  136. """Disconnect all connections in the pool."""
  137. all_conns = chain(self._conns_inuse, self._conns_available)
  138. for conn in all_conns:
  139. conn.disconnect()
  140. # print '[-] Destroy connection pool %s.' % self.pool_name
  141. def release(self, conn):
  142. """Release the connection back to the pool."""
  143. self._check_pid()
  144. if conn.pid == self.pid:
  145. self._conns_inuse.remove(conn)
  146. self._conns_available.append(conn)
  147. # print '[-] Release connection back to pool %s.' % self.pool_name
  148. # end ConnectionPool class
  149. def tcp_recv_response(conn, bytes_size, buffer_size=4096):
  150. """Receive response from server.
  151. It is not include tracker header.
  152. arguments:
  153. @conn: connection
  154. @bytes_size: int, will be received byte_stream size
  155. @buffer_size: int, receive buffer size
  156. @Return: tuple,(response, received_size)
  157. """
  158. recv_buff = []
  159. total_size = 0
  160. try:
  161. while bytes_size > 0:
  162. resp = conn._sock.recv(buffer_size)
  163. recv_buff.append(resp)
  164. total_size += len(resp)
  165. bytes_size -= len(resp)
  166. except (socket.error, socket.timeout) as e:
  167. raise ConnectionError('[-] Error: while reading from socket: (%s)' \
  168. % e.args)
  169. return b''.join(recv_buff), total_size
  170. def tcp_send_data(conn, bytes_stream):
  171. """Send buffer to server.
  172. It is not include tracker header.
  173. arguments:
  174. @conn: connection
  175. @bytes_stream: trasmit buffer
  176. @Return bool
  177. """
  178. try:
  179. conn._sock.sendall(bytes_stream)
  180. except (socket.error, socket.timeout) as e:
  181. raise ConnectionError('[-] Error: while writting to socket: (%s)' \
  182. % e.args)