connection.py 30 KB


  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # filename: storage_client.py
  4. import os, stat
  5. import struct
  6. import socket
  7. import datetime
  8. import errno
  9. from fdfs_client.fdfs_protol import *
  10. from fdfs_client.connection import *
  11. # from fdfs_client.sendfile import *
  12. from fdfs_client.exceptions import (
  13. FDFSError,
  14. ConnectionError,
  15. ResponseError,
  16. InvaildResponse,
  17. DataError
  18. )
  19. from fdfs_client.utils import *
  20. def tcp_send_file(conn, filename, buffer_size = 1024):
  21. '''
  22. Send file to server, and split into multiple pkgs while sending.
  23. arguments:
  24. @conn: connection
  25. @filename: string
  26. @buffer_size: int ,send buffer size
  27. @Return int: file size if success else raise ConnectionError.
  28. '''
  29. file_size = 0
  30. with open(filename, 'rb') as f:
  31. while 1:
  32. try:
  33. send_buffer = f.read(buffer_size)
  34. send_size = len(send_buffer)
  35. if send_size == 0:
  36. break
  37. tcp_send_data(conn, send_buffer)
  38. file_size += send_size
  39. except ConnectionError, e:
  40. raise ConnectionError('[-] Error while uploading file(%s).' % e.args)
  41. except IOError, e:
  42. raise DataError('[-] Error while reading local file(%s).' % e.args)
  43. return file_size
  44. def tcp_send_file_ex(conn, filename, buffer_size = 4096):
  45. '''
  46. Send file to server. Using linux system call 'sendfile'.
  47. arguments:
  48. @conn: connection
  49. @filename: string
  50. @return long, sended size
  51. '''
  52. if 'linux' not in sys.platform.lower():
  53. raise DataError('[-] Error: \'sendfile\' system call only available on linux.')
  54. nbytes = 0
  55. offset = 0
  56. sock_fd = conn.get_sock().fileno()
  57. with open(filename, 'rb') as f:
  58. in_fd = f.fileno()
  59. while 1:
  60. try:
  61. sent = sendfile(sock_fd, in_fd, offset, buffer_size)
  62. if 0 == sent:
  63. break
  64. nbytes += sent
  65. offset += sent
  66. except OSError, e:
  67. if e.errno == errno.EAGAIN:
  68. continue
  69. raise
  70. return nbytes
  71. def tcp_recv_file(conn, local_filename, file_size, buffer_size = 1024):
  72. '''
  73. Receive file from server, fragmented it while receiving and write to disk.
  74. arguments:
  75. @conn: connection
  76. @local_filename: string
  77. @file_size: int, remote file size
  78. @buffer_size: int, receive buffer size
  79. @Return int: file size if success else raise ConnectionError.
  80. '''
  81. total_file_size = 0
  82. flush_size = 0
  83. remain_bytes = file_size
  84. with open(local_filename, 'wb+') as f:
  85. diff_size = remain_bytes
  86. while diff_size > buffer_size:
  87. print diff_size
  88. diff_size = remain_bytes - total_file_size
  89. try:
  90. if diff_size >= buffer_size:
  91. file_buffer, recv_size = tcp_recv_response(conn, buffer_size, \
  92. buffer_size)
  93. else:
  94. file_buffer, recv_size = tcp_recv_response(conn, diff_size, \
  95. buffer_size)
  96. f.write(file_buffer)
  97. total_file_size += recv_size
  98. print total_file_size, 'total_file_size'
  99. flush_size += recv_size
  100. if flush_size >= 4096:
  101. f.flush()
  102. flush_size = 0
  103. except ConnectionError, e:
  104. raise ConnectionError('[-] Error: while downloading file(%s).' % e.args)
  105. except IOError, e:
  106. raise DataError('[-] Error: while writting local file(%s).' % e.args)
  107. return total_file_size
  108. class Storage_client(object):
  109. '''
  110. The Class Storage_client for storage server.
  111. Note: argument host_tuple of storage server ip address, that should be a single element.
  112. '''
  113. def __init__(self, *kwargs):
  114. conn_kwargs = {
  115. 'name' : 'Storage Pool',
  116. 'host_tuple' : (kwargs[0],),
  117. 'port' : kwargs[1],
  118. 'timeout' : kwargs[2]
  119. }
  120. self.pool = ConnectionPool(**conn_kwargs)
  121. return None
  122. def __del__(self):
  123. try:
  124. self.pool.destroy()
  125. self.pool = None
  126. except:
  127. pass
  128. def update_pool(self, old_store_serv, new_store_serv, timeout = 30):
  129. '''
  130. Update connection pool of storage client.
  131. We need update connection pool of storage client, while storage server is changed.
  132. but if server not changed, we do nothing.
  133. '''
  134. if old_store_serv.ip_addr == new_store_serv.ip_addr:
  135. return None
  136. self.pool.destroy()
  137. conn_kwargs = {
  138. 'name' : 'Storage_pool',
  139. 'host_tuple' : (new_store_serv.ip_addr,),
  140. 'port' : new_store_serv.port,
  141. 'timeout' : timeout
  142. }
  143. self.pool = ConnectionPool(**conn_kwargs)
  144. return True
  145. def _storage_do_upload_file(self, tracker_client, store_serv, \
  146. file_buffer, file_size = None, upload_type = None, \
  147. meta_dict = None, cmd = None, master_filename = None, \
  148. prefix_name = None, file_ext_name = None):
  149. '''
  150. core of upload file.
  151. arguments:
  152. @tracker_client: Tracker_client, it is useful connect to tracker server
  153. @store_serv: Storage_server, it is return from query tracker server
  154. @file_buffer: string, file name or file buffer for send
  155. @file_size: int
  156. @upload_type: int, optional: FDFS_UPLOAD_BY_FILE, FDFS_UPLOAD_BY_FILENAME,
  157. FDFS_UPLOAD_BY_BUFFER
  158. @meta_dic: dictionary, store metadata in it
  159. @cmd: int, reference fdfs protol
  160. @master_filename: string, useful upload slave file
  161. @prefix_name: string
  162. @file_ext_name: string
  163. @Return dictionary
  164. {
  165. 'Group name' : group_name,
  166. 'Remote file_id' : remote_file_id,
  167. 'Status' : status,
  168. 'Local file name' : local_filename,
  169. 'Uploaded size' : upload_size,
  170. 'Storage IP' : storage_ip
  171. }
  172. '''
  173. store_conn = self.pool.get_connection()
  174. th = Tracker_header()
  175. master_filename_len = len(master_filename) if master_filename else 0
  176. prefix_name_len = len(prefix_name) if prefix_name else 0
  177. upload_slave = len(store_serv.group_name) and master_filename_len
  178. file_ext_name = str(file_ext_name) if file_ext_name else ''
  179. #non_slave_fmt |-store_path_index(1)-file_size(8)-file_ext_name(6)-|
  180. non_slave_fmt = '!B Q %ds' % FDFS_FILE_EXT_NAME_MAX_LEN
  181. #slave_fmt |-master_len(8)-file_size(8)-prefix_name(16)-file_ext_name(6)
  182. # -master_name(master_filename_len)-|
  183. slave_fmt = '!Q Q %ds %ds %ds' % (FDFS_FILE_PREFIX_MAX_LEN, \
  184. FDFS_FILE_EXT_NAME_MAX_LEN, \
  185. master_filename_len)
  186. th.pkg_len = struct.calcsize(slave_fmt) if upload_slave \
  187. else struct.calcsize(non_slave_fmt)
  188. th.pkg_len += file_size
  189. th.cmd = cmd
  190. th.send_header(store_conn)
  191. if upload_slave:
  192. send_buffer = struct.pack(slave_fmt, master_filename_len, file_size, \
  193. prefix_name, file_ext_name, \
  194. master_filename)
  195. else:
  196. send_buffer = struct.pack(non_slave_fmt, store_serv.store_path_index, \
  197. file_size, file_ext_name)
  198. try:
  199. tcp_send_data(store_conn, send_buffer)
  200. if upload_type == FDFS_UPLOAD_BY_FILENAME:
  201. send_file_size = tcp_send_file(store_conn, file_buffer)
  202. elif upload_type == FDFS_UPLOAD_BY_BUFFER:
  203. tcp_send_data(store_conn, file_buffer)
  204. elif upload_type == FDFS_UPLOAD_BY_FILE:
  205. send_file_size = tcp_send_file_ex(store_conn, file_buffer)
  206. th.recv_header(store_conn)
  207. if th.status != 0:
  208. raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
  209. recv_buffer, recv_size = tcp_recv_response(store_conn, th.pkg_len)
  210. if recv_size <= FDFS_GROUP_NAME_MAX_LEN:
  211. errmsg = '[-] Error: Storage response length is not match, '
  212. errmsg += 'expect: %d, actual: %d' % (th.pkg_len, recv_size)
  213. raise ResponseError(errmsg)
  214. #recv_fmt: |-group_name(16)-remote_file_name(recv_size - 16)-|
  215. recv_fmt = '!%ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, \
  216. th.pkg_len - FDFS_GROUP_NAME_MAX_LEN)
  217. (group_name, remote_name) = struct.unpack(recv_fmt, recv_buffer)
  218. remote_filename = remote_name.strip('\x00')
  219. if meta_dict and len(meta_dict) > 0:
  220. status = self.storage_set_metadata(tracker_client, store_serv, \
  221. remote_filename, meta_dict)
  222. if status != 0:
  223. #rollback
  224. self.storage_delete_file(tracker_client, store_serv, remote_filename)
  225. raise DataError('[-] Error: %d, %s' % (status, os.strerror(status)))
  226. except:
  227. raise
  228. finally:
  229. self.pool.release(store_conn)
  230. ret_dic = {
  231. 'Group name' : group_name.strip('\x00'),
  232. 'Remote file_id' : group_name.strip('\x00') + os.sep + \
  233. remote_filename,
  234. 'Status' : 'Upload successed.',
  235. 'Local file name' : file_buffer if (upload_type == FDFS_UPLOAD_BY_FILENAME \
  236. or upload_type == FDFS_UPLOAD_BY_FILE) \
  237. else '',
  238. 'Uploaded size' : appromix(send_file_size) if (upload_type == \
  239. FDFS_UPLOAD_BY_FILENAME or upload_type == \
  240. FDFS_UPLOAD_BY_FILE) else appromix( len(file_buffer)),
  241. 'Storage IP' : store_serv.ip_addr
  242. }
  243. return ret_dic
  244. def storage_upload_by_filename(self, tracker_client, store_serv, filename, \
  245. meta_dict = None):
  246. file_size = os.stat(filename).st_size
  247. file_ext_name = get_file_ext_name(filename)
  248. return self._storage_do_upload_file(tracker_client, store_serv, filename, \
  249. file_size, FDFS_UPLOAD_BY_FILENAME, meta_dict, \
  250. STORAGE_PROTO_CMD_UPLOAD_FILE, None, \
  251. None, file_ext_name)
  252. def storage_upload_by_file(self, tracker_client, store_serv, filename, \
  253. meta_dict = None):
  254. file_size = os.stat(filename).st_size
  255. file_ext_name = get_file_ext_name(filename)
  256. return self._storage_do_upload_file(tracker_client, store_serv, filename, \
  257. file_size, FDFS_UPLOAD_BY_FILE, meta_dict, \
  258. STORAGE_PROTO_CMD_UPLOAD_FILE, None, \
  259. None, file_ext_name)
  260. def storage_upload_by_buffer(self, tracker_client, store_serv, \
  261. file_buffer, file_ext_name = None, meta_dict = None):
  262. buffer_size = len(file_buffer)
  263. return self._storage_do_upload_file(tracker_client, store_serv, file_buffer, \
  264. buffer_size, FDFS_UPLOAD_BY_BUFFER, meta_dict, \
  265. STORAGE_PROTO_CMD_UPLOAD_FILE, None, \
  266. None, file_ext_name)
  267. def storage_upload_slave_by_filename(self, tracker_client, store_serv, \
  268. filename, prefix_name, remote_filename, \
  269. meta_dict = None):
  270. file_size = os.stat(filename).st_size
  271. file_ext_name = get_file_ext_name(filename)
  272. return self._storage_do_upload_file(tracker_client, store_serv, filename, \
  273. file_size, FDFS_UPLOAD_BY_FILENAME, meta_dict, \
  274. STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE, \
  275. remote_filename, prefix_name, \
  276. file_ext_name)
  277. def storage_upload_slave_by_file(self, tracker_client, store_serv, \
  278. filename, prefix_name, remote_filename, \
  279. meta_dict = None):
  280. file_size = os.stat(filename).st_size
  281. file_ext_name = get_file_ext_name(filename)
  282. return self._storage_do_upload_file(tracker_client, store_serv, filename, \
  283. file_size, FDFS_UPLOAD_BY_FILE, meta_dict, \
  284. STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE, \
  285. remote_filename, prefix_name, \
  286. file_ext_name)
  287. def storage_upload_slave_by_buffer(self, tracker_client, store_serv, \
  288. filebuffer, remote_filename, meta_dict, \
  289. file_ext_name):
  290. file_size = len(filebuffer)
  291. return self._storage_do_upload_file(tracker_client, store_serv, \
  292. filebuffer, file_size, FDFS_UPLOAD_BY_BUFFER, \
  293. meta_dict, STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE, \
  294. None, remote_filename, file_ext_name)
  295. def storage_upload_appender_by_filename(self, tracker_client, store_serv, \
  296. filename, meta_dict = None):
  297. file_size = os.stat(filename).st_size
  298. file_ext_name = get_file_ext_name(filename)
  299. return self._storage_do_upload_file(tracker_client, store_serv, filename, \
  300. file_size, FDFS_UPLOAD_BY_FILENAME, meta_dict, \
  301. STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE, \
  302. None, None, file_ext_name)
  303. def storage_upload_appender_by_file(self, tracker_client, store_serv, \
  304. filename, meta_dict = None):
  305. file_size = os.stat(filename).st_size
  306. file_ext_name = get_file_ext_name(filename)
  307. return self._storage_do_upload_file(tracker_client, store_serv, filename, \
  308. file_size, FDFS_UPLOAD_BY_FILE, meta_dict, \
  309. STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE, \
  310. None, None, file_ext_name)
  311. def storage_upload_appender_by_buffer(self, tracker_client, store_serv, \
  312. file_buffer, meta_dict = None, \
  313. file_ext_name = None):
  314. file_size = len(file_buffer)
  315. return self._storage_do_upload_file(tracker_client, store_serv, file_buffer, \
  316. file_size, FDFS_UPLOAD_BY_BUFFER, meta_dict, \
  317. STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE, \
  318. None, None, file_ext_name)
  319. def storage_delete_file(self, tracker_client, store_serv, remote_filename):
  320. '''
  321. Delete file from storage server.
  322. '''
  323. store_conn = self.pool.get_connection()
  324. th = Tracker_header()
  325. th.cmd = STORAGE_PROTO_CMD_DELETE_FILE
  326. file_name_len = len(remote_filename)
  327. th.pkg_len = FDFS_GROUP_NAME_MAX_LEN + file_name_len
  328. try:
  329. th.send_header(store_conn)
  330. #del_fmt: |-group_name(16)-filename(len)-|
  331. del_fmt = '!%ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, file_name_len)
  332. send_buffer = struct.pack(del_fmt, store_serv.group_name, remote_filename)
  333. tcp_send_data(store_conn, send_buffer)
  334. th.recv_header(store_conn)
  335. #if th.status == 2:
  336. # raise DataError('[-] Error: remote file %s is not exist.' \
  337. # % (store_serv.group_name + os.sep + remote_filename))
  338. if th.status != 0:
  339. raise DataError('Error: %d, %s' % (th.status, os.strerror(th.status)))
  340. #recv_buffer, recv_size = tcp_recv_response(store_conn, th.pkg_len)
  341. except:
  342. raise
  343. finally:
  344. self.pool.release(store_conn)
  345. remote_filename = store_serv.group_name + os.sep + remote_filename
  346. return ('Delete file successed.', remote_filename, store_serv.ip_addr)
  347. def _storage_do_download_file(self, tracker_client, store_serv, file_buffer, \
  348. offset, download_size, download_type, remote_filename):
  349. '''
  350. Core of download file from storage server.
  351. You can choice download type, optional FDFS_DOWNLOAD_TO_FILE or
  352. FDFS_DOWNLOAD_TO_BUFFER. And you can choice file offset.
  353. @Return dictionary
  354. 'Remote file name' : remote_filename,
  355. 'Content' : local_filename or buffer,
  356. 'Download size' : download_size,
  357. 'Storage IP' : storage_ip
  358. '''
  359. store_conn = self.pool.get_connection()
  360. th = Tracker_header()
  361. remote_filename_len = len(remote_filename)
  362. th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + FDFS_GROUP_NAME_MAX_LEN + \
  363. remote_filename_len
  364. th.cmd = STORAGE_PROTO_CMD_DOWNLOAD_FILE
  365. try:
  366. th.send_header(store_conn)
  367. #down_fmt: |-offset(8)-download_bytes(8)-group_name(16)-remote_filename(len)-|
  368. down_fmt = '!Q Q %ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, remote_filename_len)
  369. send_buffer = struct.pack(down_fmt, offset, download_size, \
  370. store_serv.group_name, remote_filename)
  371. tcp_send_data(store_conn, send_buffer)
  372. th.recv_header(store_conn)
  373. #if th.status == 2:
  374. # raise DataError('[-] Error: remote file %s is not exist.' %
  375. # (store_serv.group_name + os.sep + remote_filename))
  376. if th.status != 0:
  377. raise DataError('Error: %d %s' % (th.status, os.strerror(th.status)))
  378. if download_type == FDFS_DOWNLOAD_TO_FILE:
  379. total_recv_size = tcp_recv_file(store_conn, file_buffer, th.pkg_len)
  380. elif download_type == FDFS_DOWNLOAD_TO_BUFFER:
  381. recv_buffer, total_recv_size = tcp_recv_response(store_conn, th.pkg_len)
  382. except:
  383. raise
  384. finally:
  385. self.pool.release(store_conn)
  386. ret_dic = {
  387. 'Remote file_id' : store_serv.group_name + os.sep + remote_filename,
  388. 'Content' : file_buffer if download_type == \
  389. FDFS_DOWNLOAD_TO_FILE else recv_buffer,
  390. 'Download size' : appromix(total_recv_size),
  391. 'Storage IP' : store_serv.ip_addr
  392. }
  393. return ret_dic
  394. def storage_download_to_file(self, tracker_client, store_serv, local_filename, \
  395. file_offset, download_bytes, remote_filename):
  396. return self._storage_do_download_file(tracker_client, store_serv, local_filename, \
  397. file_offset, download_bytes, \
  398. FDFS_DOWNLOAD_TO_FILE, remote_filename)
  399. def storage_download_to_buffer(self, tracker_client, store_serv, file_buffer, \
  400. file_offset, download_bytes, remote_filename):
  401. return self._storage_do_download_file(tracker_client, store_serv, file_buffer, \
  402. file_offset, download_bytes, \
  403. FDFS_DOWNLOAD_TO_BUFFER, remote_filename)
  404. def storage_set_metadata(self, tracker_client, store_serv, \
  405. remote_filename, meta_dict, \
  406. op_flag = STORAGE_SET_METADATA_FLAG_OVERWRITE):
  407. ret = 0
  408. conn = self.pool.get_connection()
  409. remote_filename_len = len(remote_filename)
  410. meta_buffer = fdfs_pack_metadata(meta_dict)
  411. meta_len = len(meta_buffer)
  412. th = Tracker_header()
  413. th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + 1 + \
  414. FDFS_GROUP_NAME_MAX_LEN + remote_filename_len + meta_len
  415. th.cmd = STORAGE_PROTO_CMD_SET_METADATA
  416. try:
  417. th.send_header(conn)
  418. #meta_fmt: |-filename_len(8)-meta_len(8)-op_flag(1)-group_name(16)
  419. # -filename(remote_filename_len)-meta(meta_len)|
  420. meta_fmt = '!Q Q c %ds %ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, \
  421. remote_filename_len, meta_len)
  422. send_buffer = struct.pack(meta_fmt, remote_filename_len, meta_len, \
  423. op_flag, store_serv.group_name, \
  424. remote_filename, meta_buffer)
  425. tcp_send_data(conn, send_buffer)
  426. th.recv_header(conn)
  427. if th.status != 0 :
  428. ret = th.status
  429. except:
  430. raise
  431. finally:
  432. self.pool.release(conn)
  433. return ret
  434. def storage_get_metadata(self, tracker_client, store_serv, remote_file_name):
  435. store_conn = self.pool.get_connection()
  436. th = Tracker_header()
  437. remote_filename_len = len(remote_file_name)
  438. th.pkg_len = FDFS_GROUP_NAME_MAX_LEN + remote_filename_len
  439. th.cmd = STORAGE_PROTO_CMD_GET_METADATA
  440. try:
  441. th.send_header(store_conn)
  442. #meta_fmt: |-group_name(16)-filename(remote_filename_len)-|
  443. meta_fmt = '!%ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, remote_filename_len)
  444. send_buffer = struct.pack(meta_fmt, store_serv.group_name, remote_file_name)
  445. tcp_send_data(store_conn, send_buffer)
  446. th.recv_header(store_conn)
  447. #if th.status == 2:
  448. # raise DataError('[-] Error: Remote file %s has no meta data.' \
  449. # % (store_serv.group_name + os.sep + remote_file_name))
  450. if th.status != 0:
  451. raise DataError('[-] Error:%d, %s' % (th.status, os.strerror(th.status)))
  452. if th.pkg_len == 0:
  453. ret_dict = {}
  454. meta_buffer, recv_size = tcp_recv_response(store_conn, th.pkg_len)
  455. except:
  456. raise
  457. finally:
  458. self.pool.release(store_conn)
  459. ret_dict = fdfs_unpack_metadata(meta_buffer)
  460. return ret_dict
  461. def _storage_do_append_file(self, tracker_client, store_serv, file_buffer, \
  462. file_size, upload_type, appended_filename):
  463. store_conn = self.pool.get_connection()
  464. th = Tracker_header()
  465. appended_filename_len = len(appended_filename)
  466. th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + appended_filename_len + file_size
  467. th.cmd = STORAGE_PROTO_CMD_APPEND_FILE
  468. try:
  469. th.send_header(store_conn)
  470. #append_fmt: |-appended_filename_len(8)-file_size(8)-appended_filename(len)
  471. # -filecontent(filesize)-|
  472. append_fmt = '!Q Q %ds' % appended_filename_len
  473. send_buffer = struct.pack(append_fmt, appended_filename_len, file_size, \
  474. appended_filename)
  475. tcp_send_data(store_conn, send_buffer)
  476. if upload_type == FDFS_UPLOAD_BY_FILENAME:
  477. tcp_send_file(store_conn, file_buffer)
  478. elif upload_type == FDFS_UPLOAD_BY_BUFFER:
  479. tcp_send_data(store_conn, file_buffer)
  480. elif upload_type == FDFS_UPLOAD_BY_FILE:
  481. tcp_send_file_ex(store_conn, file_buffer)
  482. th.recv_header(store_conn)
  483. if th.status != 0:
  484. raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
  485. except:
  486. raise
  487. finally:
  488. self.pool.release(store_conn)
  489. ret_dict = {}
  490. ret_dict['Status'] = 'Append file successed.'
  491. ret_dict['Appender file name'] = store_serv.group_name + os.sep + appended_filename
  492. ret_dict['Appended size'] = appromix(file_size)
  493. ret_dict['Storage IP'] = store_serv.ip_addr
  494. return ret_dict
  495. def storage_append_by_filename(self, tracker_client, store_serv, \
  496. local_filename, appended_filename):
  497. file_size = os.stat(local_filename).st_size
  498. return self._storage_do_append_file(tracker_client, store_serv, \
  499. local_filename, file_size, \
  500. FDFS_UPLOAD_BY_FILENAME, appended_filename)
  501. def storage_append_by_file(self, tracker_client, store_serv, \
  502. local_filename, appended_filename):
  503. file_size = os.stat(local_filename).st_size
  504. return self._storage_do_append_file(tracker_client, store_serv, \
  505. local_filename, file_size, \
  506. FDFS_UPLOAD_BY_FILE, appended_filename)
  507. def storage_append_by_buffer(self, tracker_client, store_serv, \
  508. file_buffer, appended_filename):
  509. file_size = len(file_buffer)
  510. return self._storage_do_append_file(tracker_client, store_serv, \
  511. file_buffer, file_size, \
  512. FDFS_UPLOAD_BY_BUFFER, appended_filename)
  513. def _storage_do_truncate_file(self, tracker_client, store_serv, \
  514. truncated_filesize, appender_filename):
  515. store_conn = self.pool.get_connection()
  516. th = Tracker_header()
  517. th.cmd = STORAGE_PROTO_CMD_TRUNCATE_FILE
  518. appender_filename_len = len(appender_filename)
  519. th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + appender_filename_len
  520. try:
  521. th.send_header(store_conn)
  522. #truncate_fmt:|-appender_filename_len(8)-truncate_filesize(8)
  523. # -appender_filename(len)-|
  524. truncate_fmt = '!Q Q %ds' % appender_filename_len
  525. send_buffer = struct.pack(truncate_fmt, appender_filename_len, \
  526. truncated_filesize, appender_filename)
  527. tcp_send_data(store_conn, send_buffer)
  528. th.recv_header(store_conn)
  529. if th.status != 0:
  530. raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
  531. except:
  532. raise
  533. finally:
  534. self.pool.release(store_conn)
  535. ret_dict = {}
  536. ret_dict['Status'] = 'Truncate successed.'
  537. ret_dict['Storage IP'] = store_serv.ip_addr
  538. return ret_dict
  539. def storage_truncate_file(self, tracker_client, store_serv, \
  540. truncated_filesize, appender_filename):
  541. return self._storage_do_truncate_file(tracker_client, store_serv, \
  542. truncated_filesize, appender_filename)
  543. def _storage_do_modify_file(self, tracker_client, store_serv, upload_type, \
  544. filebuffer, offset, filesize, appender_filename):
  545. store_conn = self.pool.get_connection()
  546. th = Tracker_header()
  547. th.cmd = STORAGE_PROTO_CMD_MODIFY_FILE
  548. appender_filename_len = len(appender_filename)
  549. th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 3 + appender_filename_len + filesize
  550. try:
  551. th.send_header(store_conn)
  552. #modify_fmt: |-filename_len(8)-offset(8)-filesize(8)-filename(len)-|
  553. modify_fmt = '!Q Q Q %ds' % appender_filename_len
  554. send_buffer = struct.pack(modify_fmt, appender_filename_len, offset, \
  555. filesize, appender_filename)
  556. tcp_send_data(store_conn, send_buffer)
  557. if upload_type == FDFS_UPLOAD_BY_FILENAME:
  558. upload_size = tcp_send_file(store_conn, filebuffer)
  559. elif upload_type == FDFS_UPLOAD_BY_BUFFER:
  560. tcp_send_data(store_conn, filebuffer)
  561. elif upload_type == FDFS_UPLOAD_BY_FILE:
  562. upload_size = tcp_send_file_ex(store_conn, filebuffer)
  563. th.recv_header(store_conn)
  564. if th.status != 0:
  565. raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
  566. except:
  567. raise
  568. finally:
  569. self.pool.release(store_conn)
  570. ret_dict = {}
  571. ret_dict['Status'] = 'Modify successed.'
  572. ret_dict['Storage IP'] = store_serv.ip_addr
  573. return ret_dict
  574. def storage_modify_by_filename(self, tracker_client, store_serv, \
  575. filename, offset, \
  576. filesize, appender_filename):
  577. return self._storage_do_modify_file(tracker_client, store_serv, \
  578. FDFS_UPLOAD_BY_FILENAME, filename, offset, \
  579. filesize, appender_filename)
  580. def storage_modify_by_file(self, tracker_client, store_serv, \
  581. filename, offset, \
  582. filesize, appender_filename):
  583. return self._storage_do_modify_file(tracker_client, store_serv, \
  584. FDFS_UPLOAD_BY_FILE, filename, offset, \
  585. filesize, appender_filename)
  586. def storage_modify_by_buffer(self, tracker_client, store_serv, \
  587. filebuffer, offset, \
  588. filesize, appender_filename):
  589. return self._storage_do_modify_file(tracker_client, store_serv, \
  590. FDFS_UPLOAD_BY_BUFFER, filebuffer, offset, \
  591. filesize, appender_filename)