Kaynağa Gözat

Merge pull request #1 from tenmachow/master

fixed connection some bugs
Kaifeng Xu 10 yıl önce
ebeveyn
işleme
de787d6a57

+ 3 - 3
fdfs_client/client.conf

@@ -7,12 +7,12 @@ connect_timeout=30
 network_timeout=60
 
 # the base path to store log files
-base_path=/home/scott/fastdfs
+base_path=/home/tenma/fastdfs
 
 # tracker_server can ocur more than once, and tracker_server format is
 #  "host:port", host can be hostname or ip address
-tracker_server=192.168.243.133:22122
-tracker_server=192.168.243.135:22122
+tracker_server=127.0.0.1:22122
+#tracker_server=192.168.243.135:22122
 
 #standard log level as syslog, case insensitive, value list:
 ### emerg for emergency

+ 19 - 15
fdfs_client/client.py

@@ -26,9 +26,8 @@ def get_tracker_conf(conf_path = 'client.conf'):
         tracker_ip_list = []
         for tr in tracker_list:
             tracker_ip, tracker_port = tr.split(':')
-            tracker_ip_list.append(tracker_ip)
+            tracker_ip_list.append((tracker_ip, tracker_port))
         tracker['host_tuple'] = tuple(tracker_ip_list)
-        tracker['port']       = int(tracker_port)
         tracker['timeout']    = timeout
         tracker['name']       = 'Tracker Pool'
     except:
@@ -47,6 +46,7 @@ class Fdfs_client(object):
         self.trackers = get_tracker_conf(conf_path)
         self.tracker_pool = poolclass(**self.trackers)
         self.timeout  = self.trackers['timeout']
+        self.storages = {}
         return None
 
     def __del__(self):
@@ -56,6 +56,13 @@ class Fdfs_client(object):
         except:
             pass
 
+    def get_storage(self, store_serv):
+        store = self.storages.get((store_serv.ip_addr, store_serv.port), None)
+        if store is None:
+            store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
+            self.storages[(store_serv.ip_addr, store_serv.port)] = store
+        return store
+
     def upload_by_filename(self, filename, meta_dict = None):
         '''
         Upload a file to Storage server.
@@ -81,8 +88,7 @@ class Fdfs_client(object):
             raise DataError(errmsg + '(uploading)')
         tc = Tracker_client(self.tracker_pool)
         store_serv = tc.tracker_query_storage_stor_without_group()
-        store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
-        return store.storage_upload_by_filename(tc, store_serv, filename, meta_dict)
+        return self.get_storage(store_serv).storage_upload_by_filename(tc, store_serv, filename, meta_dict)
 
     def upload_by_file(self, filename, meta_dict = None):
         isfile, errmsg = fdfs_check_file(filename)
@@ -90,8 +96,7 @@ class Fdfs_client(object):
             raise DataError(errmsg + '(uploading)')
         tc = Tracker_client(self.tracker_pool)
         store_serv = tc.tracker_query_storage_stor_without_group()
-        store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
-        return store.storage_upload_by_file(tc, store_serv, filename, meta_dict)
+        return self.get_storage(store_serv).storage_upload_by_file(tc, store_serv, filename, meta_dict)
 
     def upload_by_buffer(self, filebuffer, file_ext_name = None, meta_dict = None):
         '''
@@ -118,8 +123,7 @@ class Fdfs_client(object):
             raise DataError('[-] Error: argument filebuffer can not be null.')
         tc = Tracker_client(self.tracker_pool)
         store_serv = tc.tracker_query_storage_stor_without_group()
-        store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
-        return store.storage_upload_by_buffer(tc, store_serv, filebuffer, \
+        return self.get_storage(store_serv).storage_upload_by_buffer(tc, store_serv, filebuffer, \
                                               file_ext_name, meta_dict)
 
     def upload_slave_by_filename(self, filename, remote_file_id, prefix_name, \
@@ -155,7 +159,7 @@ class Fdfs_client(object):
         group_name, remote_filename = tmp
         tc = Tracker_client(self.tracker_pool)
         store_serv = tc.tracker_query_storage_stor_with_group(group_name)
-        store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
+        store = self.get_storage(store_serv)
         try:
             ret_dict = store.storage_upload_slave_by_filename(tc, store_serv, filename, \
                                                           prefix_name, remote_filename, \
@@ -198,7 +202,7 @@ class Fdfs_client(object):
         group_name, remote_filename = tmp
         tc = Tracker_client(self.tracker_pool)
         store_serv = tc.tracker_query_storage_stor_with_group(group_name)
-        store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
+        store = self.get_storage(store_serv)
         try:
             ret_dict = store.storage_upload_slave_by_file(tc, store_serv, filename, \
                                                           prefix_name, remote_filename, \
@@ -237,7 +241,7 @@ class Fdfs_client(object):
         group_name, remote_filename = tmp
         tc = Tracker_client(self.tracker_pool)
         store_serv = tc.tracker_query_storage_update(group_name, remote_filename)
-        store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
+        store = self.get_storage(store_serv)
         return store.storage_upload_slave_by_buffer(tc, store_serv, filebuffer, \
                                                     remote_filename, meta_dict, \
                                                     file_ext_name)
@@ -267,7 +271,7 @@ class Fdfs_client(object):
             raise DataError(errmsg + '(uploading appender)')
         tc = Tracker_client(self.tracker_pool)
         store_serv = tc.tracker_query_storage_stor_without_group()
-        store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
+        store = self.get_storage(store_serv)
         return store.storage_upload_appender_by_filename(tc, store_serv, \
                                                          local_filename, meta_dict)
 
@@ -296,7 +300,7 @@ class Fdfs_client(object):
             raise DataError(errmsg + '(uploading appender)')
         tc = Tracker_client(self.tracker_pool)
         store_serv = tc.tracker_query_storage_stor_without_group()
-        store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
+        store = self.get_storage(store_serv)
         return store.storage_upload_appender_by_file(tc, store_serv, \
                                                          local_filename, meta_dict)
 
@@ -320,7 +324,7 @@ class Fdfs_client(object):
             raise DataError('[-] Error: argument filebuffer can not be null.')
         tc = Tracker_client(self.tracker_pool)
         store_serv = tc.tracker_query_storage_stor_without_group()
-        store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
+        store = self.get_storage(store_serv)
         return store.storage_upload_appender_by_buffer(tc, store_serv, \
                                                        filebuffer, meta_dict, \
                                                        file_ext_name)
@@ -338,7 +342,7 @@ class Fdfs_client(object):
         group_name, remote_filename = tmp
         tc = Tracker_client(self.tracker_pool)
         store_serv = tc.tracker_query_storage_update(group_name, remote_filename)
-        store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
+        store = self.get_storage(store_serv)
         return store.storage_delete_file(tc, store_serv, remote_filename)
 
     def download_to_file(self, local_filename, remote_file_id, offset = 0, down_bytes = 0):

+ 12 - 4
fdfs_client/connection.py

@@ -22,7 +22,7 @@ class Connection(object):
     def __init__(self, **conn_kwargs):
         self.pid = os.getpid()
         self.host_tuple = conn_kwargs['host_tuple']
-        self.remote_port = conn_kwargs['port']
+        self.remote_port = None
         self.remote_addr = None
         self.timeout = conn_kwargs['timeout']
         self._sock = None
@@ -45,10 +45,18 @@ class Connection(object):
         #print '[+] Create a connection success.'
         #print '\tLocal address is %s:%s.' % self._sock.getsockname()
         #print '\tRemote address is %s:%s' % (self.remote_addr, self.remote_port)
+
+    def sendall(self, msg):
+        if not self._sock:
+            self.connect()
+        self._sock.sendall(msg)
         
+    def recv(self, len):
+        return self._sock.recv(len)
+
     def _connect(self):
         '''Create TCP socket. The host is random one of host_tuple.'''
-        self.remote_addr = random.choice(self.host_tuple)
+        self.remote_addr, self.remote_port = random.choice(self.host_tuple)
         #print '[+] Connecting... remote: %s:%s' % (self.remote_addr, self.remote_port)
         #sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         #sock.settimeout(self.timeout)
@@ -62,7 +70,7 @@ class Connection(object):
         try:
             self._sock.close()
         except socket.error, e:
-            raise ConnectionError(self._errormessage(e))
+            pass
         self._sock = None
 
     def get_sock(self):
@@ -97,7 +105,7 @@ class ConnectionPool(object):
     def _check_pid(self):
         if self.pid != os.getpid():
             self.destroy()
-            self.__init__(self.conn_class, self.max_conn, **self.conn_kwargs)
+            self.__init__(self.pool_name, self.conn_class, self.max_conn, **self.conn_kwargs)
 
     def make_conn(self):
         '''Create a new connection.'''

+ 5 - 2
fdfs_client/fdfs_protol.py

@@ -98,6 +98,7 @@ FDFS_RECORD_SEPERATOR      =	'\x01'
 FDFS_FIELD_SEPERATOR       =	'\x02'
 
 #common constants
+FDFS_STORAGE_ID_MAX_SIZE   =    16
 FDFS_GROUP_NAME_MAX_LEN	   =    16
 IP_ADDRESS_SIZE            =    16
 FDFS_PROTO_PKG_LEN_SIZE	   =	8
@@ -194,7 +195,7 @@ class Tracker_header(object):
         '''Send Tracker header to server.'''
         header = self._pack(self.pkg_len, self.cmd, self.status)
         try:
-            conn._sock.sendall(header)
+            conn.sendall(header)
         except (socket.error, socket.timeout), e:
             raise ConnectionError('[-] Error: while writting to socket: %s' \
                                   % (e.args,))
@@ -204,10 +205,12 @@ class Tracker_header(object):
            if sucess, class member (pkg_len, cmd, status) is response.
         '''
         try:
-            header = conn._sock.recv(self.header_len())
+            header = conn.recv(self.header_len())
         except (socket.error, socket.timeout), e:
             raise ConnectionError('[-] Error: while reading from socket: %s' \
                                   % (e.args,))
+        if not header:
+            raise ConnectionError("Socket closed on remote end")
         self._unpack(header)
 
 def fdfs_pack_metadata(meta_dict):

+ 2 - 4
fdfs_client/storage_client.py

@@ -116,8 +116,7 @@ class Storage_client(object):
     def __init__(self, *kwargs):
         conn_kwargs = {
             'name'       : 'Storage Pool',
-            'host_tuple' : (kwargs[0],),
-            'port'       : kwargs[1],
+            'host_tuple' : ((kwargs[0],kwargs[1]),),
             'timeout'    : kwargs[2]
         }
         self.pool = ConnectionPool(**conn_kwargs)
@@ -141,8 +140,7 @@ class Storage_client(object):
         self.pool.destroy()
         conn_kwargs = {
             'name'       : 'Storage_pool',
-            'host_tuple' : (new_store_serv.ip_addr,),
-            'port'       : new_store_serv.port,
+            'host_tuple' : ((new_store_serv.ip_addr,new_store_serv.port),),
             'timeout'    : timeout
         }
         self.pool = ConnectionPool(**conn_kwargs)

+ 19 - 6
fdfs_client/tracker_client.py

@@ -36,6 +36,7 @@ def parse_storage_status(status_code):
 class Storage_info(object):
     def __init__(self):
         self.status = 0
+        self.id = ''
         self.ip_addr = ''
         self.domain_name = ''
         self.src_ip_addr = ''
@@ -94,13 +95,14 @@ class Storage_info(object):
         self.last_heartbeat_time = datetime.fromtimestamp(0).isoformat()
         self.if_trunk_server = 0
         #fmt = |-status(1)-ipaddr(16)-domain(128)-srcipaddr(16)-ver(6)-52*8-|
-        self.fmt = '!B %ds %ds %ds %ds 52QB' % (IP_ADDRESS_SIZE, \
-                                               FDFS_DOMAIN_NAME_MAX_LEN,
+        self.fmt = '!B %ds %ds %ds %ds %ds 52QB' % (FDFS_STORAGE_ID_MAX_SIZE, \
+                                               IP_ADDRESS_SIZE, \
+                                               FDFS_DOMAIN_NAME_MAX_LEN, \
                                                IP_ADDRESS_SIZE, \
                                            FDFS_VERSION_SIZE)
 
     def set_info(self, bytes_stream):
-        (self.status, ip_addr, domain_name, \
+        (self.status, id, ip_addr, domain_name, \
          src_ip_addr, version, join_time,up_time, \
          totalMB, freeMB, self.upload_prio, \
          self.store_path_count, self.subdir_count_per_path, \
@@ -129,6 +131,7 @@ class Storage_info(object):
          last_heartbeat_time, self.if_trunk_server) \
           = struct.unpack(self.fmt, bytes_stream)
         try:
+            self.id = id.strip('\x00')
             self.ip_addr = ip_addr.strip('\x00')
             self.domain_name = domain_name.strip('\x00')
             self.version = version.strip('\x00')
@@ -150,6 +153,7 @@ class Storage_info(object):
         '''Transform to readable string.'''
         
         s  = 'Storage information:\n'
+        s += '\tid = %s\n' % (self.id)
         s += '\tip_addr = %s (%s)\n' % (self.ip_addr, parse_storage_status(self.status))
         s += '\thttp domain = %s\n' % self.domain_name
         s += '\tversion = %s\n' % self.version
@@ -215,6 +219,7 @@ class Storage_info(object):
 class Group_info(object):
     def __init__(self):
         self.group_name            = ''
+        self.totalMB               = ''
         self.freeMB                = ''
         self.trunk_freeMB          = ''
         self.count                 = 0
@@ -225,13 +230,14 @@ class Group_info(object):
         self.store_path_count      = 0
         self.subdir_count_per_path = 0
         self.curr_trunk_file_id    = 0
-        self.fmt                   = '!%ds 10Q' % (FDFS_GROUP_NAME_MAX_LEN + 1)
+        self.fmt                   = '!%ds 11Q' % (FDFS_GROUP_NAME_MAX_LEN + 1)
         return None
 
     def __str__(self):
 
         s = 'Group information:\n'
         s += '\tgroup name = %s\n' % self.group_name
+        s += '\tdisk total space = %s\n' % self.totalMB
         s += '\tdisk free space = %s\n' % self.freeMB
         s += '\ttrunk free space = %s\n' % self.trunk_freeMB
         s += '\tstorage server count = %d\n' % self.count
@@ -245,12 +251,13 @@ class Group_info(object):
         return s
 
     def set_info(self, bytes_stream):
-        (group_name, freeMB, trunk_freeMB, self.count, self.storage_port, \
+        (group_name, totalMB, freeMB, trunk_freeMB, self.count, self.storage_port, \
          self.store_http_port, self.active_count, self.curr_write_server, \
          self.store_path_count, self.subdir_count_per_path, self.curr_trunk_file_id) \
         = struct.unpack(self.fmt, bytes_stream)
         try:
             self.group_name = group_name.strip('\x00')
+            self.totalMB = appromix(totalMB, FDFS_SPACE_SIZE_BASE_INDEX)
             self.freeMB = appromix(freeMB, FDFS_SPACE_SIZE_BASE_INDEX)
             self.trunk_freeMB = appromix(trunk_freeMB, FDFS_SPACE_SIZE_BASE_INDEX)
         except ValueError:
@@ -295,6 +302,7 @@ class Tracker_client(object):
                                    % (th.pkg_len, recv_size)
                 raise ResponseError(errinfo)
         except ConnectionError:
+            conn.disconnect()
             raise
         finally:
             self.pool.release(conn)
@@ -330,6 +338,7 @@ class Tracker_client(object):
             group_info = Group_info()
             group_info.set_info(recv_buffer)
         except ConnectionError:
+            conn.disconnect()
             raise
         finally:
             self.pool.release(conn)
@@ -345,7 +354,8 @@ class Tracker_client(object):
             if th.status != 0:
                 raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
             recv_buffer, recv_size = tcp_recv_response(conn, th.pkg_len)
-        except:
+        except ConnectionError:
+            conn.disconnect()
             raise
         finally:
             self.pool.release(conn)
@@ -387,6 +397,7 @@ class Tracker_client(object):
                          % (TRACKER_QUERY_STORAGE_STORE_BODY_LEN, recv_size)
                 raise ResponseError(errmsg)
         except ConnectionError:
+            conn.disconnect()
             raise
         finally:
             self.pool.release(conn)
@@ -424,6 +435,7 @@ class Tracker_client(object):
                             % (TRACKER_QUERY_STORAGE_STORE_BODY_LEN, recv_size)
                 raise ResponseError(errmsg)
         except ConnectionError:
+            conn.disconnect()
             raise
         finally:
             self.pool.release(conn)
@@ -465,6 +477,7 @@ class Tracker_client(object):
                 errmsg += 'expect: %d, actual: %d' % (th.pkg_len, recv_size)
                 raise ResponseError(errmsg)
         except ConnectionError:
+            conn.disconnect()
             raise
         finally:
             self.pool.release(conn)