Просмотр исходного кода

Update connection.py

修复下载文件不完整问题
SurelyYouareJoking 7 лет назад
Родитель
Сommit
64946a61b4
1 измененных файлов с 180 добавлено и 609 удалено
  1. 180 609
      fdfs_client/connection.py

+ 180 - 609
fdfs_client/connection.py

@@ -1,628 +1,199 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
-# filename: storage_client.py
+# filename: connection.py
 
-import os, stat
-import struct
 import socket
-import datetime
-import errno
-from fdfs_client.fdfs_protol import *
-from fdfs_client.connection import *
-# from fdfs_client.sendfile import *
+import os
+import sys
+import time
+import random
+from itertools import chain
 from fdfs_client.exceptions import (
     FDFSError,
     ConnectionError,
     ResponseError,
     InvaildResponse,
     DataError
-)
-from fdfs_client.utils import *
-
-def tcp_send_file(conn, filename, buffer_size = 1024):
-    '''
-    Send file to server, and split into multiple pkgs while sending.
-    arguments:
-    @conn: connection
-    @filename: string
-    @buffer_size: int ,send buffer size
-    @Return int: file size if success else raise ConnectionError.
-    '''
-    file_size = 0
-    with open(filename, 'rb') as f:
-        while 1:
-            try:
-                send_buffer = f.read(buffer_size)
-                send_size = len(send_buffer)
-                if  send_size == 0:
-                    break
-                tcp_send_data(conn, send_buffer)
-                file_size += send_size
-            except ConnectionError, e:
-                raise ConnectionError('[-] Error while uploading file(%s).' % e.args)
-            except IOError, e:
-                raise DataError('[-] Error while reading local file(%s).' % e.args)
-    return file_size
-
-def tcp_send_file_ex(conn, filename, buffer_size = 4096):
-    '''
-    Send file to server. Using linux system call 'sendfile'.
-    arguments:
-    @conn: connection
-    @filename: string
-    @return long, sended size
-    '''
-    if 'linux' not in sys.platform.lower():
-        raise DataError('[-] Error: \'sendfile\' system call only available on linux.')
-    nbytes = 0
-    offset = 0
-    sock_fd = conn.get_sock().fileno()
-    with open(filename, 'rb') as f:
-        in_fd = f.fileno()
-        while 1:
-            try:
-                sent = sendfile(sock_fd, in_fd, offset, buffer_size)
-                if 0 == sent:
-                    break
-                nbytes += sent
-                offset += sent
-            except OSError, e:
-                if e.errno == errno.EAGAIN:
-                    continue
-                raise
-    return nbytes
-        
-
-def tcp_recv_file(conn, local_filename, file_size, buffer_size = 1024):
-    '''
-    Receive file from server, fragmented it while receiving and write to disk.
-    arguments:
-    @conn: connection
-    @local_filename: string
-    @file_size: int, remote file size
-    @buffer_size: int, receive buffer size
-    @Return int: file size if success else raise ConnectionError.
-    '''
-    total_file_size = 0
-    flush_size = 0
-    remain_bytes = file_size
-    with open(local_filename, 'wb+') as f:
-        diff_size = remain_bytes
-        while diff_size > buffer_size:
-            print diff_size
-            diff_size = remain_bytes - total_file_size
-            try:
-                if diff_size >= buffer_size:
-                    file_buffer, recv_size = tcp_recv_response(conn, buffer_size, \
-                                                               buffer_size)
-                else:
-                    file_buffer, recv_size = tcp_recv_response(conn, diff_size, \
-                                                               buffer_size)
-                f.write(file_buffer)
-                total_file_size += recv_size
-                print total_file_size, 'total_file_size'
-                flush_size += recv_size
-                if flush_size >= 4096:
-                    f.flush()
-                    flush_size = 0
-            except ConnectionError, e:
-                raise ConnectionError('[-] Error: while downloading file(%s).' % e.args)
-            except IOError, e:
-                raise DataError('[-] Error: while writting local file(%s).' % e.args)
-    return total_file_size
-    
-class Storage_client(object):
-    '''
-    The Class Storage_client for storage server.
-    Note: argument host_tuple of storage server ip address, that should be a single element.
-    '''
-    def __init__(self, *kwargs):
-        conn_kwargs = {
-            'name'       : 'Storage Pool',
-            'host_tuple' : (kwargs[0],),
-            'port'       : kwargs[1],
-            'timeout'    : kwargs[2]
-        }
-        self.pool = ConnectionPool(**conn_kwargs)
-        return None
+    )
+
+# start class Connection
+class Connection(object):
+    '''Manage TCP comunication to and from Fastdfs Server.'''
+    def __init__(self, **conn_kwargs):
+        self.pid = os.getpid()
+        self.host_tuple = conn_kwargs['host_tuple']
+        self.remote_port = conn_kwargs['port']
+        self.remote_addr = None
+        self.timeout = conn_kwargs['timeout']
+        self._sock = None
 
     def __del__(self):
         try:
-            self.pool.destroy()
-            self.pool = None
+            self.disconnect()
         except:
             pass
 
-    def update_pool(self, old_store_serv, new_store_serv, timeout = 30):
-        '''
-        Update connection pool of storage client.
-        We need update connection pool of storage client, while storage server is changed.
-        but if server not changed, we do nothing.
-        '''
-        if old_store_serv.ip_addr == new_store_serv.ip_addr:
-            return None
-        self.pool.destroy()
-        conn_kwargs = {
-            'name'       : 'Storage_pool',
-            'host_tuple' : (new_store_serv.ip_addr,),
-            'port'       : new_store_serv.port,
-            'timeout'    : timeout
-        }
-        self.pool = ConnectionPool(**conn_kwargs)
-        return True
-        
-
-    def _storage_do_upload_file(self, tracker_client, store_serv, \
-                               file_buffer, file_size = None, upload_type = None, \
-                               meta_dict = None, cmd = None, master_filename = None, \
-                               prefix_name = None, file_ext_name = None):
-        '''
-        core of upload file.
-        arguments:
-        @tracker_client: Tracker_client, it is useful connect to tracker server
-        @store_serv: Storage_server, it is return from query tracker server
-        @file_buffer: string, file name or file buffer for send
-        @file_size: int
-        @upload_type: int, optional: FDFS_UPLOAD_BY_FILE, FDFS_UPLOAD_BY_FILENAME,
-                                     FDFS_UPLOAD_BY_BUFFER
-        @meta_dic: dictionary, store metadata in it
-        @cmd: int, reference fdfs protol
-        @master_filename: string, useful upload slave file
-        @prefix_name: string
-        @file_ext_name: string
-        @Return dictionary 
-                 {
-                     'Group name'      : group_name,
-                     'Remote file_id'  : remote_file_id,
-                     'Status'          : status,
-                     'Local file name' : local_filename,
-                     'Uploaded size'   : upload_size,
-                     'Storage IP'      : storage_ip
-                 }
-
-        '''
-        
-        store_conn = self.pool.get_connection()
-        th = Tracker_header()
-        master_filename_len = len(master_filename) if master_filename else 0
-        prefix_name_len = len(prefix_name) if prefix_name else 0
-        upload_slave = len(store_serv.group_name) and master_filename_len
-        file_ext_name = str(file_ext_name) if file_ext_name else ''
-        #non_slave_fmt |-store_path_index(1)-file_size(8)-file_ext_name(6)-|
-        non_slave_fmt = '!B Q %ds' % FDFS_FILE_EXT_NAME_MAX_LEN
-        #slave_fmt |-master_len(8)-file_size(8)-prefix_name(16)-file_ext_name(6)
-        #           -master_name(master_filename_len)-|
-        slave_fmt = '!Q Q %ds %ds %ds' % (FDFS_FILE_PREFIX_MAX_LEN, \
-                                          FDFS_FILE_EXT_NAME_MAX_LEN, \
-                                          master_filename_len)
-        th.pkg_len = struct.calcsize(slave_fmt) if upload_slave \
-                                                else struct.calcsize(non_slave_fmt)
-        th.pkg_len += file_size
-        th.cmd = cmd
-        th.send_header(store_conn)
-        if upload_slave:
-            send_buffer = struct.pack(slave_fmt, master_filename_len, file_size, \
-                                       prefix_name, file_ext_name, \
-                                                  master_filename)
-        else:
-            send_buffer = struct.pack(non_slave_fmt, store_serv.store_path_index, \
-                                                    file_size, file_ext_name)
-        try:
-            tcp_send_data(store_conn, send_buffer)
-            if upload_type == FDFS_UPLOAD_BY_FILENAME:
-                send_file_size = tcp_send_file(store_conn, file_buffer)
-            elif upload_type == FDFS_UPLOAD_BY_BUFFER:
-                tcp_send_data(store_conn, file_buffer)
-            elif upload_type == FDFS_UPLOAD_BY_FILE:
-                send_file_size = tcp_send_file_ex(store_conn, file_buffer)
-            th.recv_header(store_conn)
-            if th.status != 0:
-                raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
-            recv_buffer, recv_size = tcp_recv_response(store_conn, th.pkg_len)
-            if recv_size <= FDFS_GROUP_NAME_MAX_LEN:
-                errmsg = '[-] Error: Storage response length is not match, '
-                errmsg += 'expect: %d, actual: %d' % (th.pkg_len, recv_size)
-                raise ResponseError(errmsg)
-            #recv_fmt: |-group_name(16)-remote_file_name(recv_size - 16)-|
-            recv_fmt = '!%ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, \
-                                 th.pkg_len - FDFS_GROUP_NAME_MAX_LEN)
-            (group_name, remote_name) = struct.unpack(recv_fmt, recv_buffer)
-            remote_filename = remote_name.strip('\x00')
-            if meta_dict and len(meta_dict) > 0:
-                status = self.storage_set_metadata(tracker_client, store_serv, \
-                                        remote_filename, meta_dict)
-                if status != 0: 
-                    #rollback
-                    self.storage_delete_file(tracker_client, store_serv, remote_filename)
-                    raise DataError('[-] Error: %d, %s' % (status, os.strerror(status)))
-        except:
-            raise
-        finally:
-            self.pool.release(store_conn)
-        ret_dic = {
-            'Group name'      : group_name.strip('\x00'),
-            'Remote file_id'  : group_name.strip('\x00') + os.sep + \
-                                   remote_filename,
-            'Status'          : 'Upload successed.',
-            'Local file name' : file_buffer if (upload_type == FDFS_UPLOAD_BY_FILENAME \
-                                            or upload_type == FDFS_UPLOAD_BY_FILE) \
-                                            else '',
-            'Uploaded size'   : appromix(send_file_size) if (upload_type == \
-                                FDFS_UPLOAD_BY_FILENAME or upload_type == \
-                                FDFS_UPLOAD_BY_FILE) else appromix( len(file_buffer)),
-            'Storage IP'      : store_serv.ip_addr
-        }
-        return ret_dic
-
-    def storage_upload_by_filename(self, tracker_client, store_serv, filename, \
-                                   meta_dict = None):
-        file_size = os.stat(filename).st_size
-        file_ext_name = get_file_ext_name(filename)
-        return self._storage_do_upload_file(tracker_client, store_serv, filename, \
-                                            file_size, FDFS_UPLOAD_BY_FILENAME, meta_dict, \
-                                            STORAGE_PROTO_CMD_UPLOAD_FILE, None, \
-                                            None, file_ext_name)
-
-    def storage_upload_by_file(self, tracker_client, store_serv, filename, \
-                               meta_dict = None):
-        file_size = os.stat(filename).st_size
-        file_ext_name = get_file_ext_name(filename)
-        return self._storage_do_upload_file(tracker_client, store_serv, filename, \
-                                            file_size, FDFS_UPLOAD_BY_FILE, meta_dict, \
-                                            STORAGE_PROTO_CMD_UPLOAD_FILE, None, \
-                                            None, file_ext_name)
-
-    def storage_upload_by_buffer(self, tracker_client, store_serv, \
-                                 file_buffer, file_ext_name = None, meta_dict = None):
-        buffer_size = len(file_buffer)
-        return self._storage_do_upload_file(tracker_client, store_serv, file_buffer, \
-                                            buffer_size, FDFS_UPLOAD_BY_BUFFER, meta_dict, \
-                                            STORAGE_PROTO_CMD_UPLOAD_FILE, None, \
-                                            None, file_ext_name)
-
-    def storage_upload_slave_by_filename(self, tracker_client, store_serv, \
-                                         filename, prefix_name, remote_filename, \
-                                         meta_dict = None):
-        file_size = os.stat(filename).st_size
-        file_ext_name = get_file_ext_name(filename)
-        return self._storage_do_upload_file(tracker_client, store_serv, filename, \
-                                            file_size, FDFS_UPLOAD_BY_FILENAME, meta_dict, \
-                                            STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE, \
-                                            remote_filename, prefix_name, \
-                                            file_ext_name)
-
-    def storage_upload_slave_by_file(self, tracker_client, store_serv, \
-                                         filename, prefix_name, remote_filename, \
-                                         meta_dict = None):
-        file_size = os.stat(filename).st_size
-        file_ext_name = get_file_ext_name(filename)
-        return self._storage_do_upload_file(tracker_client, store_serv, filename, \
-                                            file_size, FDFS_UPLOAD_BY_FILE, meta_dict, \
-                                            STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE, \
-                                            remote_filename, prefix_name, \
-                                            file_ext_name)
-
-    def storage_upload_slave_by_buffer(self, tracker_client, store_serv, \
-                                       filebuffer, remote_filename, meta_dict, \
-                                       file_ext_name):
-        file_size = len(filebuffer)
-        return self._storage_do_upload_file(tracker_client, store_serv, \
-                                            filebuffer, file_size, FDFS_UPLOAD_BY_BUFFER, \
-                                            meta_dict, STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE, \
-                                            None, remote_filename, file_ext_name)
-
-    def storage_upload_appender_by_filename(self, tracker_client, store_serv, \
-                                            filename, meta_dict = None):
-        file_size = os.stat(filename).st_size
-        file_ext_name = get_file_ext_name(filename)
-        return self._storage_do_upload_file(tracker_client, store_serv, filename, \
-                                            file_size, FDFS_UPLOAD_BY_FILENAME, meta_dict, \
-                                            STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE, \
-                                            None, None, file_ext_name)
-
-    def storage_upload_appender_by_file(self, tracker_client, store_serv, \
-                                            filename, meta_dict = None):
-        file_size = os.stat(filename).st_size
-        file_ext_name = get_file_ext_name(filename)
-        return self._storage_do_upload_file(tracker_client, store_serv, filename, \
-                                            file_size, FDFS_UPLOAD_BY_FILE, meta_dict, \
-                                            STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE, \
-                                            None, None, file_ext_name)
-
-    def storage_upload_appender_by_buffer(self, tracker_client, store_serv, \
-                                          file_buffer, meta_dict = None, \
-                                          file_ext_name = None):
-        file_size = len(file_buffer)
-        return self._storage_do_upload_file(tracker_client, store_serv, file_buffer, \
-                                            file_size, FDFS_UPLOAD_BY_BUFFER, meta_dict, \
-                                            STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE, \
-                                            None, None, file_ext_name)
-
-    def storage_delete_file(self, tracker_client, store_serv, remote_filename):
-        '''
-        Delete file from storage server.
-        '''
-        store_conn = self.pool.get_connection()
-        th = Tracker_header()
-        th.cmd = STORAGE_PROTO_CMD_DELETE_FILE
-        file_name_len = len(remote_filename)
-        th.pkg_len = FDFS_GROUP_NAME_MAX_LEN + file_name_len
-        try:
-            th.send_header(store_conn)
-            #del_fmt: |-group_name(16)-filename(len)-|
-            del_fmt = '!%ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, file_name_len)
-            send_buffer = struct.pack(del_fmt, store_serv.group_name, remote_filename)
-            tcp_send_data(store_conn, send_buffer)
-            th.recv_header(store_conn)
-            #if th.status == 2:
-            #    raise DataError('[-] Error: remote file %s is not exist.' \
-            #                    % (store_serv.group_name + os.sep + remote_filename))
-            if th.status != 0:
-                raise DataError('Error: %d, %s' % (th.status, os.strerror(th.status)))
-            #recv_buffer, recv_size = tcp_recv_response(store_conn, th.pkg_len)
-        except:
-            raise
-        finally:
-            self.pool.release(store_conn)
-        remote_filename = store_serv.group_name + os.sep + remote_filename
-        return ('Delete file successed.', remote_filename, store_serv.ip_addr)
-
-    def _storage_do_download_file(self, tracker_client, store_serv, file_buffer, \
-                                  offset, download_size, download_type, remote_filename):
-        '''
-        Core of download file from storage server.
-        You can choice download type, optional FDFS_DOWNLOAD_TO_FILE or 
-        FDFS_DOWNLOAD_TO_BUFFER. And you can choice file offset.
-        @Return dictionary
-            'Remote file name' : remote_filename,
-            'Content' : local_filename or buffer,
-            'Download size'   : download_size,
-            'Storage IP'      : storage_ip
-        '''
-        store_conn = self.pool.get_connection()
-        th = Tracker_header()
-        remote_filename_len = len(remote_filename)
-        th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + FDFS_GROUP_NAME_MAX_LEN + \
-                     remote_filename_len
-        th.cmd = STORAGE_PROTO_CMD_DOWNLOAD_FILE
-        try:
-            th.send_header(store_conn)
-            #down_fmt: |-offset(8)-download_bytes(8)-group_name(16)-remote_filename(len)-|
-            down_fmt = '!Q Q %ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, remote_filename_len)
-            send_buffer = struct.pack(down_fmt, offset, download_size, \
-                                      store_serv.group_name, remote_filename)
-            tcp_send_data(store_conn, send_buffer)
-            th.recv_header(store_conn)
-            #if th.status == 2:
-            #    raise DataError('[-] Error: remote file %s is not exist.' % 
-            #                    (store_serv.group_name + os.sep + remote_filename))
-            if th.status != 0:
-                raise DataError('Error: %d %s' % (th.status, os.strerror(th.status)))
-            if download_type == FDFS_DOWNLOAD_TO_FILE:
-                total_recv_size = tcp_recv_file(store_conn, file_buffer, th.pkg_len)
-            elif download_type == FDFS_DOWNLOAD_TO_BUFFER:
-                recv_buffer, total_recv_size = tcp_recv_response(store_conn, th.pkg_len)
-        except:
-            raise
-        finally:
-            self.pool.release(store_conn)
-        ret_dic = {
-            'Remote file_id' : store_serv.group_name + os.sep + remote_filename,
-            'Content' : file_buffer if download_type == \
-                                   FDFS_DOWNLOAD_TO_FILE else recv_buffer,
-            'Download size'   : appromix(total_recv_size),
-            'Storage IP'      : store_serv.ip_addr
-        }
-        return ret_dic
-
-    def storage_download_to_file(self, tracker_client, store_serv, local_filename, \
-                                 file_offset, download_bytes, remote_filename):
-        return self._storage_do_download_file(tracker_client, store_serv, local_filename, \
-                                              file_offset, download_bytes, \
-                                              FDFS_DOWNLOAD_TO_FILE, remote_filename)
-
-    def storage_download_to_buffer(self, tracker_client, store_serv, file_buffer, \
-                                   file_offset, download_bytes, remote_filename):
-        return self._storage_do_download_file(tracker_client, store_serv, file_buffer, \
-                                              file_offset, download_bytes, \
-                                              FDFS_DOWNLOAD_TO_BUFFER, remote_filename)
-
-    def storage_set_metadata(self, tracker_client, store_serv, \
-                             remote_filename, meta_dict, \
-                             op_flag = STORAGE_SET_METADATA_FLAG_OVERWRITE):
-        ret = 0
-        conn = self.pool.get_connection()
-        remote_filename_len = len(remote_filename)
-        meta_buffer = fdfs_pack_metadata(meta_dict)
-        meta_len = len(meta_buffer)
-        th = Tracker_header()
-        th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + 1 + \
-                         FDFS_GROUP_NAME_MAX_LEN + remote_filename_len + meta_len
-        th.cmd = STORAGE_PROTO_CMD_SET_METADATA
+    def connect(self):
+        '''Connect to fdfs server.'''
+        if self._sock:
+            return
         try:
-            th.send_header(conn)
-            #meta_fmt: |-filename_len(8)-meta_len(8)-op_flag(1)-group_name(16)
-            #           -filename(remote_filename_len)-meta(meta_len)|
-            meta_fmt = '!Q Q c %ds %ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, \
-                                               remote_filename_len, meta_len)
-            send_buffer = struct.pack(meta_fmt, remote_filename_len, meta_len, \
-                                      op_flag, store_serv.group_name, \
-                                      remote_filename, meta_buffer)
-            tcp_send_data(conn, send_buffer)
-            th.recv_header(conn)
-            if th.status != 0 :
-                ret = th.status
-        except:
-            raise
-        finally:
-            self.pool.release(conn)
-        return ret
-
-    def storage_get_metadata(self, tracker_client, store_serv, remote_file_name):
-        store_conn = self.pool.get_connection()
-        th = Tracker_header()
-        remote_filename_len = len(remote_file_name)
-        th.pkg_len = FDFS_GROUP_NAME_MAX_LEN + remote_filename_len
-        th.cmd = STORAGE_PROTO_CMD_GET_METADATA
-        try:
-            th.send_header(store_conn)
-            #meta_fmt: |-group_name(16)-filename(remote_filename_len)-|
-            meta_fmt = '!%ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, remote_filename_len)
-            send_buffer = struct.pack(meta_fmt, store_serv.group_name, remote_file_name)
-            tcp_send_data(store_conn, send_buffer)
-            th.recv_header(store_conn)
-            #if th.status == 2:
-            #    raise DataError('[-] Error: Remote file %s has no meta data.' \
-            #                    % (store_serv.group_name + os.sep + remote_file_name))
-            if th.status != 0:
-                raise DataError('[-] Error:%d, %s' % (th.status, os.strerror(th.status)))
-            if th.pkg_len == 0:
-                ret_dict = {}
-            meta_buffer, recv_size = tcp_recv_response(store_conn, th.pkg_len)
-        except:
-            raise
-        finally:
-            self.pool.release(store_conn)
-        ret_dict = fdfs_unpack_metadata(meta_buffer)
-        return ret_dict
-
-    def _storage_do_append_file(self, tracker_client, store_serv, file_buffer, \
-                                file_size, upload_type, appended_filename):
-        store_conn = self.pool.get_connection()
-        th = Tracker_header()
-        appended_filename_len = len(appended_filename)
-        th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + appended_filename_len + file_size
-        th.cmd = STORAGE_PROTO_CMD_APPEND_FILE
-        try:
-            th.send_header(store_conn)
-            #append_fmt: |-appended_filename_len(8)-file_size(8)-appended_filename(len)
-            #             -filecontent(filesize)-|
-            append_fmt = '!Q Q %ds' % appended_filename_len
-            send_buffer = struct.pack(append_fmt, appended_filename_len, file_size, \
-                                      appended_filename)
-            tcp_send_data(store_conn, send_buffer)
-            if upload_type == FDFS_UPLOAD_BY_FILENAME:
-                tcp_send_file(store_conn, file_buffer)
-            elif upload_type == FDFS_UPLOAD_BY_BUFFER:
-                tcp_send_data(store_conn, file_buffer)
-            elif upload_type == FDFS_UPLOAD_BY_FILE:
-                tcp_send_file_ex(store_conn, file_buffer)
-            th.recv_header(store_conn)
-            if th.status != 0:
-                raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
-        except:
-            raise
-        finally:
-            self.pool.release(store_conn)
-        ret_dict = {}
-        ret_dict['Status'] = 'Append file successed.'
-        ret_dict['Appender file name'] = store_serv.group_name + os.sep + appended_filename
-        ret_dict['Appended size'] = appromix(file_size)
-        ret_dict['Storage IP'] = store_serv.ip_addr
-        return ret_dict
-
-    def storage_append_by_filename(self, tracker_client, store_serv, \
-                                   local_filename, appended_filename):
-        file_size = os.stat(local_filename).st_size
-        return self._storage_do_append_file(tracker_client, store_serv, \
-                                            local_filename, file_size, \
-                                            FDFS_UPLOAD_BY_FILENAME, appended_filename)
-
-    def storage_append_by_file(self, tracker_client, store_serv, \
-                                   local_filename, appended_filename):
-        file_size = os.stat(local_filename).st_size
-        return self._storage_do_append_file(tracker_client, store_serv, \
-                                            local_filename, file_size, \
-                                            FDFS_UPLOAD_BY_FILE, appended_filename)
-
-    def storage_append_by_buffer(self, tracker_client, store_serv, \
-                                 file_buffer, appended_filename):
-        file_size = len(file_buffer)
-        return self._storage_do_append_file(tracker_client, store_serv, \
-                                            file_buffer, file_size, \
-                                            FDFS_UPLOAD_BY_BUFFER, appended_filename)
-
-    def _storage_do_truncate_file(self, tracker_client, store_serv, \
-                                 truncated_filesize, appender_filename):
-        store_conn = self.pool.get_connection()
-        th = Tracker_header()
-        th.cmd = STORAGE_PROTO_CMD_TRUNCATE_FILE
-        appender_filename_len = len(appender_filename)
-        th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + appender_filename_len
+            sock = self._connect()
+        except socket.error, e:
+            raise ConnectionError(self._errormessage(e))
+        self._sock = sock
+        #print '[+] Create a connection success.'
+        #print '\tLocal address is %s:%s.' % self._sock.getsockname()
+        #print '\tRemote address is %s:%s' % (self.remote_addr, self.remote_port)
+        
+    def _connect(self):
+        '''Create TCP socket. The host is random one of host_tuple.'''
+        self.remote_addr = random.choice(self.host_tuple)
+        #print '[+] Connecting... remote: %s:%s' % (self.remote_addr, self.remote_port)
+        #sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        #sock.settimeout(self.timeout)
+        sock = socket.create_connection((self.remote_addr, self.remote_port),self.timeout)
+        return sock
+
+    def disconnect(self):
+        '''Disconnect from fdfs server.'''
+        if self._sock is None:
+            return
         try:
-            th.send_header(store_conn)
-            #truncate_fmt:|-appender_filename_len(8)-truncate_filesize(8)
-            #              -appender_filename(len)-|
-            truncate_fmt = '!Q Q %ds' % appender_filename_len
-            send_buffer = struct.pack(truncate_fmt, appender_filename_len, \
-                                      truncated_filesize, appender_filename)
-            tcp_send_data(store_conn, send_buffer)
-            th.recv_header(store_conn)
-            if th.status != 0:
-                raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
-        except:
-            raise
-        finally:
-            self.pool.release(store_conn)
-        ret_dict = {}
-        ret_dict['Status'] = 'Truncate successed.'
-        ret_dict['Storage IP'] = store_serv.ip_addr
-        return ret_dict
-
-    def storage_truncate_file(self, tracker_client, store_serv, \
-                              truncated_filesize, appender_filename):
-        return self._storage_do_truncate_file(tracker_client, store_serv, \
-                                              truncated_filesize, appender_filename)
-
-    def _storage_do_modify_file(self, tracker_client, store_serv, upload_type, \
-                               filebuffer, offset, filesize, appender_filename):
-        store_conn = self.pool.get_connection()
-        th = Tracker_header()
-        th.cmd = STORAGE_PROTO_CMD_MODIFY_FILE
-        appender_filename_len = len(appender_filename)
-        th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 3 + appender_filename_len + filesize
+            self._sock.close()
+        except socket.error, e:
+            raise ConnectionError(self._errormessage(e))
+        self._sock = None
+
+    def get_sock(self):
+        return self._sock
+
+    def _errormessage(self, exception):
+        # args for socket.error can either be (errno, "message")
+        # or just "message" '''
+        if len(exception.args) == 1:
+            return "[-] Error: connect to %s:%s. %s." % \
+                (self.remote_addr, self.remote_port, exception.args[0])
+        else:
+            return "[-] Error: %s connect to %s:%s. %s." % \
+                (exception.args[0], self.remote_addr, self.remote_port, exception.args[1])
+# end class Connection
+
+# start ConnectionPool
+class ConnectionPool(object):
+    '''Generic Connection Pool'''
+    def __init__(self, name = '', conn_class = Connection,
+                 max_conn = None, **conn_kwargs):
+        self.pool_name = name
+        self.pid = os.getpid()
+        self.conn_class = conn_class
+        self.max_conn = max_conn or 2**31
+        self.conn_kwargs = conn_kwargs
+        self._conns_created = 0
+        self._conns_available = []
+        self._conns_inuse = set()
+        #print '[+] Create a connection pool success, name: %s.' % self.pool_name
+
+    def _check_pid(self):
+        if self.pid != os.getpid():
+            self.destroy()
+            self.__init__(self.conn_class, self.max_conn, **self.conn_kwargs)
+
+    def make_conn(self):
+        '''Create a new connection.'''
+        if self._conns_created >= self.max_conn:
+            raise ConnectionError('[-] Error: Too many connections.')
+        num_try = 10
+        while True:
+            try:
+                if num_try <= 0:
+                    sys.exit()
+                conn_instance = self.conn_class(**self.conn_kwargs)
+                conn_instance.connect()
+                self._conns_created += 1
+                break
+            except ConnectionError, e:
+                print e
+                num_try -= 1
+                conn_instance = None
+        return conn_instance
+
+    def get_connection(self):
+        '''Get a connection from pool.'''
+        self._check_pid()
         try:
-            th.send_header(store_conn)
-            #modify_fmt: |-filename_len(8)-offset(8)-filesize(8)-filename(len)-|
-            modify_fmt = '!Q Q Q %ds' % appender_filename_len
-            send_buffer = struct.pack(modify_fmt, appender_filename_len, offset, \
-                                      filesize, appender_filename)
-            tcp_send_data(store_conn, send_buffer)
-            if upload_type == FDFS_UPLOAD_BY_FILENAME:
-                upload_size = tcp_send_file(store_conn, filebuffer)
-            elif upload_type == FDFS_UPLOAD_BY_BUFFER:
-                tcp_send_data(store_conn, filebuffer)
-            elif upload_type == FDFS_UPLOAD_BY_FILE:
-                upload_size = tcp_send_file_ex(store_conn, filebuffer)
-            th.recv_header(store_conn)
-            if th.status != 0:
-                raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
-        except:
-            raise
-        finally:
-            self.pool.release(store_conn)
-        ret_dict = {}
-        ret_dict['Status'] = 'Modify successed.'
-        ret_dict['Storage IP'] = store_serv.ip_addr
-        return ret_dict
-
-    def storage_modify_by_filename(self, tracker_client, store_serv, \
-                               filename, offset, \
-                               filesize, appender_filename):
-        return self._storage_do_modify_file(tracker_client, store_serv, \
-                                            FDFS_UPLOAD_BY_FILENAME, filename, offset, \
-                                            filesize, appender_filename)
-
-    def storage_modify_by_file(self, tracker_client, store_serv, \
-                               filename, offset, \
-                               filesize, appender_filename):
-        return self._storage_do_modify_file(tracker_client, store_serv, \
-                                            FDFS_UPLOAD_BY_FILE, filename, offset, \
-                                            filesize, appender_filename)
-
-    def storage_modify_by_buffer(self, tracker_client, store_serv, \
-                                 filebuffer, offset, \
-                                 filesize, appender_filename):
-        return self._storage_do_modify_file(tracker_client, store_serv, \
-                                            FDFS_UPLOAD_BY_BUFFER, filebuffer, offset, \
-                                            filesize, appender_filename)
+            conn = self._conns_available.pop()
+            #print '[+] Get a connection from pool %s.' % self.pool_name
+            #print '\tLocal address is %s:%s.' % conn._sock.getsockname()
+            #print '\tRemote address is %s:%s' % (conn.remote_addr, conn.remote_port)
+        except IndexError:
+            conn = self.make_conn()
+        self._conns_inuse.add(conn)
+        return conn
+
+    def remove(self, conn):
+        '''Remove connection from pool.'''
+        if conn in self._conns_inuse:
+            self._conns_inuse.remove(conn)
+            self._conns_created -= 1
+        if conn in self._conns_available:
+            self._conns_available.remove(conn)
+            self._conns_created -= 1
+
+    def destroy(self):
+        '''Disconnect all connections in the pool.'''
+        all_conns = chain(self._conns_inuse, self._conns_available)
+        for conn in all_conns:
+            conn.disconnect()
+        #print '[-] Destroy connection pool %s.' % self.pool_name
+
+    def release(self, conn):
+        '''Release the connection back to the pool.'''
+        self._check_pid()
+        if conn.pid == self.pid:
+            self._conns_inuse.remove(conn)
+            self._conns_available.append(conn)
+        #print '[-] Release connection back to pool %s.' % self.pool_name
+# end ConnectionPool class
+
+def tcp_recv_response(conn, bytes_size, buffer_size = 1024):
+    '''Receive response from server.
+        It is not include tracker header.
+        arguments:
+        @conn: connection
+        @bytes_size: int, will be received byte_stream size
+        @buffer_size: int, receive buffer size
+        @Return: tuple,(response, received_size)
+    '''
+    response = ''
+    total_size = 0
+    total_bytes_size = bytes_size
+    try:
+        while 1:
+            if total_bytes_size - total_size <= buffer_size:
+                resp = conn._sock.recv(buffer_size)
+                response += resp
+                total_size += len(resp)
+                break
+            resp = conn._sock.recv(buffer_size)
+            response += resp
+            total_size += len(resp)
+            
+    except (socket.error, socket.timeout), e:
+            raise ConnectionError('[-] Error: while reading from socket: (%s)' \
+                                    % e.args)
+    return (response, total_size)
+
+def tcp_send_data(conn, bytes_stream):
+    '''Send buffer to server.
+        It is not include tracker header.
+        arguments:
+        @conn: connection
+        @bytes_stream: trasmit buffer
+        @Return bool
+    '''
+    try:
+        conn._sock.sendall(bytes_stream)
+    except (socket.error, socket.timeout), e:
+        raise ConnectionError('[-] Error: while writting to socket: (%s)' \
+                                % e.args)