123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # filename: connection.py
- import socket
- import os
- import sys
- import time
- import random
- from itertools import chain
- from fdfs_client.exceptions import (
- FDFSError,
- ConnectionError,
- ResponseError,
- InvaildResponse,
- DataError
- )
- # start class Connection
- class Connection(object):
- """Manage TCP comunication to and from Fastdfs Server."""
- def __init__(self, **conn_kwargs):
- self.pid = os.getpid()
- self.host_tuple = conn_kwargs['host_tuple']
- self.remote_port = conn_kwargs['port']
- self.remote_addr = None
- self.timeout = conn_kwargs['timeout']
- self._sock = None
- def __del__(self):
- try:
- self.disconnect()
- except:
- pass
- def connect(self):
- """Connect to fdfs server."""
- if self._sock:
- return
- try:
- sock = self._connect()
- except socket.error as e:
- raise ConnectionError(self._errormessage(e))
- self._sock = sock
- #print '[+] Create a connection success.'
- #print '\tLocal address is %s:%s.' % self._sock.getsockname()
- #print '\tRemote address is %s:%s' % (self.remote_addr, self.remote_port)
-
- def _connect(self):
- '''Create TCP socket. The host is random one of host_tuple.'''
- self.remote_addr = random.choice(self.host_tuple)
- #print '[+] Connecting... remote: %s:%s' % (self.remote_addr, self.remote_port)
- #sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- #sock.settimeout(self.timeout)
- sock = socket.create_connection((self.remote_addr, self.remote_port),self.timeout)
- return sock
- def disconnect(self):
- """Disconnect from fdfs server."""
- if self._sock is None:
- return
- try:
- self._sock.close()
- except socket.error as e:
- raise ConnectionError(self._errormessage(e))
- self._sock = None
- def get_sock(self):
- return self._sock
- def _errormessage(self, exception):
- # args for socket.error can either be (errno, "message")
- # or just "message" """
- if len(exception.args) == 1:
- return "[-] Error: connect to %s:%s. %s." % \
- (self.remote_addr, self.remote_port, exception.args[0])
- else:
- return "[-] Error: %s connect to %s:%s. %s." % \
- (exception.args[0], self.remote_addr, self.remote_port, exception.args[1])
- # end class Connection
- # start ConnectionPool
- class ConnectionPool(object):
- """Generic Connection Pool"""
- def __init__(self, name='', conn_class=Connection,
- max_conn=None, **conn_kwargs):
- self.pool_name = name
- self.pid = os.getpid()
- self.conn_class = conn_class
- self.max_conn = max_conn or 2 ** 31
- self.conn_kwargs = conn_kwargs
- self._conns_created = 0
- self._conns_available = []
- self._conns_inuse = set()
- # print '[+] Create a connection pool success, name: %s.' % self.pool_name
- def _check_pid(self):
- if self.pid != os.getpid():
- self.destroy()
- self.__init__(self.pool_name, self.conn_class, self.max_conn, **self.conn_kwargs)
- def make_conn(self):
- """Create a new connection."""
- if self._conns_created >= self.max_conn:
- raise ConnectionError('[-] Error: Too many connections.')
- num_try = 3
- while True:
- try:
- if num_try <= 0:
- break
- conn_instance = self.conn_class(**self.conn_kwargs)
- conn_instance.connect()
- self._conns_created += 1
- break
- except ConnectionError as e:
- print(e)
- num_try -= 1
- conn_instance = None
- if num_try <= 0:
- raise ConnectionError("Fail to connect with Fdfs-server after trying 3 times")
- return conn_instance
- def get_connection(self):
- """Get a connection from pool."""
- self._check_pid()
- try:
- conn = self._conns_available.pop()
- # print '[+] Get a connection from pool %s.' % self.pool_name
- # print '\tLocal address is %s:%s.' % conn._sock.getsockname()
- # print '\tRemote address is %s:%s' % (conn.remote_addr, conn.remote_port)
- except IndexError:
- conn = self.make_conn()
- self._conns_inuse.add(conn)
- return conn
- def remove(self, conn):
- """Remove connection from pool."""
- if conn in self._conns_inuse:
- self._conns_inuse.remove(conn)
- self._conns_created -= 1
- if conn in self._conns_available:
- self._conns_available.remove(conn)
- self._conns_created -= 1
- def destroy(self):
- """Disconnect all connections in the pool."""
- all_conns = chain(self._conns_inuse, self._conns_available)
- for conn in all_conns:
- conn.disconnect()
- # print '[-] Destroy connection pool %s.' % self.pool_name
- def release(self, conn):
- """Release the connection back to the pool."""
- self._check_pid()
- if conn.pid == self.pid:
- self._conns_inuse.remove(conn)
- self._conns_available.append(conn)
- # print '[-] Release connection back to pool %s.' % self.pool_name
- # end ConnectionPool class
- def tcp_recv_response(conn, bytes_size, buffer_size = 1024):
- '''Receive response from server.
- It is not include tracker header.
- arguments:
- @conn: connection
- @bytes_size: int, will be received byte_stream size
- @buffer_size: int, receive buffer size
- @Return: tuple,(response, received_size)
- '''
- response = ''
- total_size = 0
- total_bytes_size = bytes_size
- try:
- while 1:
- if total_bytes_size - total_size <= buffer_size:
- resp = conn._sock.recv(buffer_size)
- response += resp
- total_size += len(resp)
- break
- resp = conn._sock.recv(buffer_size)
- response += resp
- total_size += len(resp)
-
- except (socket.error, socket.timeout) as e:
- raise ConnectionError('[-] Error: while reading from socket: (%s)' \
- % e.args)
- return (response, total_size)
- def tcp_send_data(conn, bytes_stream):
- """Send buffer to server.
- It is not include tracker header.
- arguments:
- @conn: connection
- @bytes_stream: trasmit buffer
- @Return bool
- """
- try:
- conn._sock.sendall(bytes_stream)
- except (socket.error, socket.timeout) as e:
- raise ConnectionError('[-] Error: while writting to socket: (%s)' \
- % e.args)
|