storage_client.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # filename: storage_client.py
  4. import os, stat
  5. import struct
  6. import socket
  7. import datetime
  8. import errno
  9. from fdfs_client.fdfs_protol import *
  10. from fdfs_client.connection import *
  11. from fdfs_client.sendfile import *
  12. from fdfs_client.exceptions import (
  13. FDFSError,
  14. ConnectionError,
  15. ResponseError,
  16. InvaildResponse,
  17. DataError
  18. )
  19. from fdfs_client.utils import *
  20. def tcp_send_file(conn, filename, buffer_size = 1024):
  21. '''
  22. Send file to server, and split into multiple pkgs while sending.
  23. arguments:
  24. @conn: connection
  25. @filename: string
  26. @buffer_size: int ,send buffer size
  27. @Return int: file size if success else raise ConnectionError.
  28. '''
  29. file_size = 0
  30. with open(filename, 'rb') as f:
  31. while 1:
  32. try:
  33. send_buffer = f.read(buffer_size)
  34. send_size = len(send_buffer)
  35. if send_size == 0:
  36. break
  37. tcp_send_data(conn, send_buffer)
  38. file_size += send_size
  39. except ConnectionError, e:
  40. raise ConnectionError('[-] Error while uploading file(%s).' % e.args)
  41. except IOError, e:
  42. raise DataError('[-] Error while reading local file(%s).' % e.args)
  43. return file_size
  44. def tcp_send_file_ex(conn, filename, buffer_size = 4096):
  45. '''
  46. Send file to server. Using linux system call 'sendfile'.
  47. arguments:
  48. @conn: connection
  49. @filename: string
  50. @return long, sended size
  51. '''
  52. if 'linux' not in sys.platform.lower():
  53. raise DataError('[-] Error: \'sendfile\' system call only available on linux.')
  54. nbytes = 0
  55. offset = 0
  56. sock_fd = conn.get_sock().fileno()
  57. with open(filename, 'rb') as f:
  58. in_fd = f.fileno()
  59. while 1:
  60. try:
  61. sent = sendfile(sock_fd, in_fd, offset, buffer_size)
  62. if 0 == sent:
  63. break
  64. nbytes += sent
  65. offset += sent
  66. except OSError, e:
  67. if e.errno == errno.EAGAIN:
  68. continue
  69. raise
  70. return nbytes
  71. def tcp_recv_file(conn, local_filename, file_size, buffer_size = 1024):
  72. '''
  73. Receive file from server, fragmented it while receiving and write to disk.
  74. arguments:
  75. @conn: connection
  76. @local_filename: string
  77. @file_size: int, remote file size
  78. @buffer_size: int, receive buffer size
  79. @Return int: file size if success else raise ConnectionError.
  80. '''
  81. total_file_size = 0
  82. flush_size = 0
  83. remain_bytes = file_size
  84. with open(local_filename, 'wb+') as f:
  85. while remain_bytes > 0:
  86. try:
  87. if remain_bytes >= buffer_size:
  88. file_buffer, recv_size = tcp_recv_response(conn, buffer_size, \
  89. buffer_size)
  90. else:
  91. file_buffer, recv_size = tcp_recv_response(conn, remain_bytes, \
  92. buffer_size)
  93. f.write(file_buffer)
  94. remain_bytes -= recv_size
  95. total_file_size += recv_size
  96. flush_size += recv_size
  97. if flush_size >= 4096:
  98. f.flush()
  99. flush_size = 0
  100. except ConnectionError, e:
  101. raise ConnectionError('[-] Error: while downloading file(%s).' % e.args)
  102. except IOError, e:
  103. raise DataError('[-] Error: while writting local file(%s).' % e.args)
  104. return total_file_size
  105. class Storage_client(object):
  106. '''
  107. The Class Storage_client for storage server.
  108. Note: argument host_tuple of storage server ip address, that should be a single element.
  109. '''
  110. def __init__(self, *kwargs):
  111. conn_kwargs = {
  112. 'name' : 'Storage Pool',
  113. 'host_tuple' : (kwargs[0],),
  114. 'port' : kwargs[1],
  115. 'timeout' : kwargs[2]
  116. }
  117. self.pool = ConnectionPool(**conn_kwargs)
  118. return None
  119. def __del__(self):
  120. try:
  121. self.pool.destroy()
  122. self.pool = None
  123. except:
  124. pass
  125. def update_pool(self, old_store_serv, new_store_serv, timeout = 30):
  126. '''
  127. Update connection pool of storage client.
  128. We need update connection pool of storage client, while storage server is changed.
  129. but if server not changed, we do nothing.
  130. '''
  131. if old_store_serv.ip_addr == new_store_serv.ip_addr:
  132. return None
  133. self.pool.destroy()
  134. conn_kwargs = {
  135. 'name' : 'Storage_pool',
  136. 'host_tuple' : (new_store_serv.ip_addr,),
  137. 'port' : new_store_serv.port,
  138. 'timeout' : timeout
  139. }
  140. self.pool = ConnectionPool(**conn_kwargs)
  141. return True
  142. def _storage_do_upload_file(self, tracker_client, store_serv, \
  143. file_buffer, file_size = None, upload_type = None, \
  144. meta_dict = None, cmd = None, master_filename = None, \
  145. prefix_name = None, file_ext_name = None):
  146. '''
  147. core of upload file.
  148. arguments:
  149. @tracker_client: Tracker_client, it is useful connect to tracker server
  150. @store_serv: Storage_server, it is return from query tracker server
  151. @file_buffer: string, file name or file buffer for send
  152. @file_size: int
  153. @upload_type: int, optional: FDFS_UPLOAD_BY_FILE, FDFS_UPLOAD_BY_FILENAME,
  154. FDFS_UPLOAD_BY_BUFFER
  155. @meta_dic: dictionary, store metadata in it
  156. @cmd: int, reference fdfs protol
  157. @master_filename: string, useful upload slave file
  158. @prefix_name: string
  159. @file_ext_name: string
  160. @Return dictionary
  161. {
  162. 'Group name' : group_name,
  163. 'Remote file_id' : remote_file_id,
  164. 'Status' : status,
  165. 'Local file name' : local_filename,
  166. 'Uploaded size' : upload_size,
  167. 'Storage IP' : storage_ip
  168. }
  169. '''
  170. store_conn = self.pool.get_connection()
  171. th = Tracker_header()
  172. master_filename_len = len(master_filename) if master_filename else 0
  173. prefix_name_len = len(prefix_name) if prefix_name else 0
  174. upload_slave = len(store_serv.group_name) and master_filename_len
  175. file_ext_name = str(file_ext_name) if file_ext_name else ''
  176. #non_slave_fmt |-store_path_index(1)-file_size(8)-file_ext_name(6)-|
  177. non_slave_fmt = '!B Q %ds' % FDFS_FILE_EXT_NAME_MAX_LEN
  178. #slave_fmt |-master_len(8)-file_size(8)-prefix_name(16)-file_ext_name(6)
  179. # -master_name(master_filename_len)-|
  180. slave_fmt = '!Q Q %ds %ds %ds' % (FDFS_FILE_PREFIX_MAX_LEN, \
  181. FDFS_FILE_EXT_NAME_MAX_LEN, \
  182. master_filename_len)
  183. th.pkg_len = struct.calcsize(slave_fmt) if upload_slave \
  184. else struct.calcsize(non_slave_fmt)
  185. th.pkg_len += file_size
  186. th.cmd = cmd
  187. th.send_header(store_conn)
  188. if upload_slave:
  189. send_buffer = struct.pack(slave_fmt, master_filename_len, file_size, \
  190. prefix_name, file_ext_name, \
  191. master_filename)
  192. else:
  193. send_buffer = struct.pack(non_slave_fmt, store_serv.store_path_index, \
  194. file_size, file_ext_name)
  195. try:
  196. tcp_send_data(store_conn, send_buffer)
  197. if upload_type == FDFS_UPLOAD_BY_FILENAME:
  198. send_file_size = tcp_send_file(store_conn, file_buffer)
  199. elif upload_type == FDFS_UPLOAD_BY_BUFFER:
  200. tcp_send_data(store_conn, file_buffer)
  201. elif upload_type == FDFS_UPLOAD_BY_FILE:
  202. send_file_size = tcp_send_file_ex(store_conn, file_buffer)
  203. th.recv_header(store_conn)
  204. if th.status != 0:
  205. raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
  206. recv_buffer, recv_size = tcp_recv_response(store_conn, th.pkg_len)
  207. if recv_size <= FDFS_GROUP_NAME_MAX_LEN:
  208. errmsg = '[-] Error: Storage response length is not match, '
  209. errmsg += 'expect: %d, actual: %d' % (th.pkg_len, recv_size)
  210. raise ResponseError(errmsg)
  211. #recv_fmt: |-group_name(16)-remote_file_name(recv_size - 16)-|
  212. recv_fmt = '!%ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, \
  213. th.pkg_len - FDFS_GROUP_NAME_MAX_LEN)
  214. (group_name, remote_name) = struct.unpack(recv_fmt, recv_buffer)
  215. remote_filename = remote_name.strip('\x00')
  216. if meta_dict and len(meta_dict) > 0:
  217. status = self.storage_set_metadata(tracker_client, store_serv, \
  218. remote_filename, meta_dict)
  219. if status != 0:
  220. #rollback
  221. self.storage_delete_file(tracker_client, store_serv, remote_filename)
  222. raise DataError('[-] Error: %d, %s' % (status, os.strerror(status)))
  223. except:
  224. raise
  225. finally:
  226. self.pool.release(store_conn)
  227. ret_dic = {
  228. 'Group name' : group_name.strip('\x00'),
  229. 'Remote file_id' : group_name.strip('\x00') + os.sep + \
  230. remote_filename,
  231. 'Status' : 'Upload successed.',
  232. 'Local file name' : file_buffer if (upload_type == FDFS_UPLOAD_BY_FILENAME \
  233. or upload_type == FDFS_UPLOAD_BY_FILE) \
  234. else '',
  235. 'Uploaded size' : appromix(send_file_size) if (upload_type == \
  236. FDFS_UPLOAD_BY_FILENAME or upload_type == \
  237. FDFS_UPLOAD_BY_FILE) else appromix( len(file_buffer)),
  238. 'Storage IP' : store_serv.ip_addr
  239. }
  240. return ret_dic
  241. def storage_upload_by_filename(self, tracker_client, store_serv, filename, \
  242. meta_dict = None):
  243. file_size = os.stat(filename).st_size
  244. file_ext_name = get_file_ext_name(filename)
  245. return self._storage_do_upload_file(tracker_client, store_serv, filename, \
  246. file_size, FDFS_UPLOAD_BY_FILENAME, meta_dict, \
  247. STORAGE_PROTO_CMD_UPLOAD_FILE, None, \
  248. None, file_ext_name)
  249. def storage_upload_by_file(self, tracker_client, store_serv, filename, \
  250. meta_dict = None):
  251. file_size = os.stat(filename).st_size
  252. file_ext_name = get_file_ext_name(filename)
  253. return self._storage_do_upload_file(tracker_client, store_serv, filename, \
  254. file_size, FDFS_UPLOAD_BY_FILE, meta_dict, \
  255. STORAGE_PROTO_CMD_UPLOAD_FILE, None, \
  256. None, file_ext_name)
  257. def storage_upload_by_buffer(self, tracker_client, store_serv, \
  258. file_buffer, file_ext_name = None, meta_dict = None):
  259. buffer_size = len(file_buffer)
  260. return self._storage_do_upload_file(tracker_client, store_serv, file_buffer, \
  261. buffer_size, FDFS_UPLOAD_BY_BUFFER, meta_dict, \
  262. STORAGE_PROTO_CMD_UPLOAD_FILE, None, \
  263. None, file_ext_name)
  264. def storage_upload_slave_by_filename(self, tracker_client, store_serv, \
  265. filename, prefix_name, remote_filename, \
  266. meta_dict = None):
  267. file_size = os.stat(filename).st_size
  268. file_ext_name = get_file_ext_name(filename)
  269. return self._storage_do_upload_file(tracker_client, store_serv, filename, \
  270. file_size, FDFS_UPLOAD_BY_FILENAME, meta_dict, \
  271. STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE, \
  272. remote_filename, prefix_name, \
  273. file_ext_name)
  274. def storage_upload_slave_by_file(self, tracker_client, store_serv, \
  275. filename, prefix_name, remote_filename, \
  276. meta_dict = None):
  277. file_size = os.stat(filename).st_size
  278. file_ext_name = get_file_ext_name(filename)
  279. return self._storage_do_upload_file(tracker_client, store_serv, filename, \
  280. file_size, FDFS_UPLOAD_BY_FILE, meta_dict, \
  281. STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE, \
  282. remote_filename, prefix_name, \
  283. file_ext_name)
  284. def storage_upload_slave_by_buffer(self, tracker_client, store_serv, \
  285. filebuffer, remote_filename, meta_dict, \
  286. file_ext_name):
  287. file_size = len(filebuffer)
  288. return self._storage_do_upload_file(tracker_client, store_serv, \
  289. filebuffer, file_size, FDFS_UPLOAD_BY_BUFFER, \
  290. meta_dict, STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE, \
  291. None, remote_filename, file_ext_name)
  292. def storage_upload_appender_by_filename(self, tracker_client, store_serv, \
  293. filename, meta_dict = None):
  294. file_size = os.stat(filename).st_size
  295. file_ext_name = get_file_ext_name(filename)
  296. return self._storage_do_upload_file(tracker_client, store_serv, filename, \
  297. file_size, FDFS_UPLOAD_BY_FILENAME, meta_dict, \
  298. STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE, \
  299. None, None, file_ext_name)
  300. def storage_upload_appender_by_file(self, tracker_client, store_serv, \
  301. filename, meta_dict = None):
  302. file_size = os.stat(filename).st_size
  303. file_ext_name = get_file_ext_name(filename)
  304. return self._storage_do_upload_file(tracker_client, store_serv, filename, \
  305. file_size, FDFS_UPLOAD_BY_FILE, meta_dict, \
  306. STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE, \
  307. None, None, file_ext_name)
  308. def storage_upload_appender_by_buffer(self, tracker_client, store_serv, \
  309. file_buffer, meta_dict = None, \
  310. file_ext_name = None):
  311. file_size = len(file_buffer)
  312. return self._storage_do_upload_file(tracker_client, store_serv, file_buffer, \
  313. file_size, FDFS_UPLOAD_BY_BUFFER, meta_dict, \
  314. STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE, \
  315. None, None, file_ext_name)
  316. def storage_delete_file(self, tracker_client, store_serv, remote_filename):
  317. '''
  318. Delete file from storage server.
  319. '''
  320. store_conn = self.pool.get_connection()
  321. th = Tracker_header()
  322. th.cmd = STORAGE_PROTO_CMD_DELETE_FILE
  323. file_name_len = len(remote_filename)
  324. th.pkg_len = FDFS_GROUP_NAME_MAX_LEN + file_name_len
  325. try:
  326. th.send_header(store_conn)
  327. #del_fmt: |-group_name(16)-filename(len)-|
  328. del_fmt = '!%ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, file_name_len)
  329. send_buffer = struct.pack(del_fmt, store_serv.group_name, remote_filename)
  330. tcp_send_data(store_conn, send_buffer)
  331. th.recv_header(store_conn)
  332. #if th.status == 2:
  333. # raise DataError('[-] Error: remote file %s is not exist.' \
  334. # % (store_serv.group_name + os.sep + remote_filename))
  335. if th.status != 0:
  336. raise DataError('Error: %d, %s' % (th.status, os.strerror(th.status)))
  337. #recv_buffer, recv_size = tcp_recv_response(store_conn, th.pkg_len)
  338. except:
  339. raise
  340. finally:
  341. self.pool.release(store_conn)
  342. remote_filename = store_serv.group_name + os.sep + remote_filename
  343. return ('Delete file successed.', remote_filename, store_serv.ip_addr)
  344. def _storage_do_download_file(self, tracker_client, store_serv, file_buffer, \
  345. offset, download_size, download_type, remote_filename):
  346. '''
  347. Core of download file from storage server.
  348. You can choice download type, optional FDFS_DOWNLOAD_TO_FILE or
  349. FDFS_DOWNLOAD_TO_BUFFER. And you can choice file offset.
  350. @Return dictionary
  351. 'Remote file name' : remote_filename,
  352. 'Content' : local_filename or buffer,
  353. 'Download size' : download_size,
  354. 'Storage IP' : storage_ip
  355. '''
  356. store_conn = self.pool.get_connection()
  357. th = Tracker_header()
  358. remote_filename_len = len(remote_filename)
  359. th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + FDFS_GROUP_NAME_MAX_LEN + \
  360. remote_filename_len
  361. th.cmd = STORAGE_PROTO_CMD_DOWNLOAD_FILE
  362. try:
  363. th.send_header(store_conn)
  364. #down_fmt: |-offset(8)-download_bytes(8)-group_name(16)-remote_filename(len)-|
  365. down_fmt = '!Q Q %ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, remote_filename_len)
  366. send_buffer = struct.pack(down_fmt, offset, download_size, \
  367. store_serv.group_name, remote_filename)
  368. tcp_send_data(store_conn, send_buffer)
  369. th.recv_header(store_conn)
  370. #if th.status == 2:
  371. # raise DataError('[-] Error: remote file %s is not exist.' %
  372. # (store_serv.group_name + os.sep + remote_filename))
  373. if th.status != 0:
  374. raise DataError('Error: %d %s' % (th.status, os.strerror(th.status)))
  375. if download_type == FDFS_DOWNLOAD_TO_FILE:
  376. total_recv_size = tcp_recv_file(store_conn, file_buffer, th.pkg_len)
  377. elif download_type == FDFS_DOWNLOAD_TO_BUFFER:
  378. recv_buffer, total_recv_size = tcp_recv_response(store_conn, th.pkg_len)
  379. except:
  380. raise
  381. finally:
  382. self.pool.release(store_conn)
  383. ret_dic = {
  384. 'Remote file_id' : store_serv.group_name + os.sep + remote_filename,
  385. 'Content' : file_buffer if download_type == \
  386. FDFS_DOWNLOAD_TO_FILE else recv_buffer,
  387. 'Download size' : appromix(total_recv_size),
  388. 'Storage IP' : store_serv.ip_addr
  389. }
  390. return ret_dic
  391. def storage_download_to_file(self, tracker_client, store_serv, local_filename, \
  392. file_offset, download_bytes, remote_filename):
  393. return self._storage_do_download_file(tracker_client, store_serv, local_filename, \
  394. file_offset, download_bytes, \
  395. FDFS_DOWNLOAD_TO_FILE, remote_filename)
  396. def storage_download_to_buffer(self, tracker_client, store_serv, file_buffer, \
  397. file_offset, download_bytes, remote_filename):
  398. return self._storage_do_download_file(tracker_client, store_serv, file_buffer, \
  399. file_offset, download_bytes, \
  400. FDFS_DOWNLOAD_TO_BUFFER, remote_filename)
  401. def storage_set_metadata(self, tracker_client, store_serv, \
  402. remote_filename, meta_dict, \
  403. op_flag = STORAGE_SET_METADATA_FLAG_OVERWRITE):
  404. ret = 0
  405. conn = self.pool.get_connection()
  406. remote_filename_len = len(remote_filename)
  407. meta_buffer = fdfs_pack_metadata(meta_dict)
  408. meta_len = len(meta_buffer)
  409. th = Tracker_header()
  410. th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + 1 + \
  411. FDFS_GROUP_NAME_MAX_LEN + remote_filename_len + meta_len
  412. th.cmd = STORAGE_PROTO_CMD_SET_METADATA
  413. try:
  414. th.send_header(conn)
  415. #meta_fmt: |-filename_len(8)-meta_len(8)-op_flag(1)-group_name(16)
  416. # -filename(remote_filename_len)-meta(meta_len)|
  417. meta_fmt = '!Q Q c %ds %ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, \
  418. remote_filename_len, meta_len)
  419. send_buffer = struct.pack(meta_fmt, remote_filename_len, meta_len, \
  420. op_flag, store_serv.group_name, \
  421. remote_filename, meta_buffer)
  422. tcp_send_data(conn, send_buffer)
  423. th.recv_header(conn)
  424. if th.status != 0 :
  425. ret = th.status
  426. except:
  427. raise
  428. finally:
  429. self.pool.release(conn)
  430. return ret
  431. def storage_get_metadata(self, tracker_client, store_serv, remote_file_name):
  432. store_conn = self.pool.get_connection()
  433. th = Tracker_header()
  434. remote_filename_len = len(remote_file_name)
  435. th.pkg_len = FDFS_GROUP_NAME_MAX_LEN + remote_filename_len
  436. th.cmd = STORAGE_PROTO_CMD_GET_METADATA
  437. try:
  438. th.send_header(store_conn)
  439. #meta_fmt: |-group_name(16)-filename(remote_filename_len)-|
  440. meta_fmt = '!%ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, remote_filename_len)
  441. send_buffer = struct.pack(meta_fmt, store_serv.group_name, remote_file_name)
  442. tcp_send_data(store_conn, send_buffer)
  443. th.recv_header(store_conn)
  444. #if th.status == 2:
  445. # raise DataError('[-] Error: Remote file %s has no meta data.' \
  446. # % (store_serv.group_name + os.sep + remote_file_name))
  447. if th.status != 0:
  448. raise DataError('[-] Error:%d, %s' % (th.status, os.strerror(th.status)))
  449. if th.pkg_len == 0:
  450. ret_dict = {}
  451. meta_buffer, recv_size = tcp_recv_response(store_conn, th.pkg_len)
  452. except:
  453. raise
  454. finally:
  455. self.pool.release(store_conn)
  456. ret_dict = fdfs_unpack_metadata(meta_buffer)
  457. return ret_dict
  458. def _storage_do_append_file(self, tracker_client, store_serv, file_buffer, \
  459. file_size, upload_type, appended_filename):
  460. store_conn = self.pool.get_connection()
  461. th = Tracker_header()
  462. appended_filename_len = len(appended_filename)
  463. th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + appended_filename_len + file_size
  464. th.cmd = STORAGE_PROTO_CMD_APPEND_FILE
  465. try:
  466. th.send_header(store_conn)
  467. #append_fmt: |-appended_filename_len(8)-file_size(8)-appended_filename(len)
  468. # -filecontent(filesize)-|
  469. append_fmt = '!Q Q %ds' % appended_filename_len
  470. send_buffer = struct.pack(append_fmt, appended_filename_len, file_size, \
  471. appended_filename)
  472. tcp_send_data(store_conn, send_buffer)
  473. if upload_type == FDFS_UPLOAD_BY_FILENAME:
  474. tcp_send_file(store_conn, file_buffer)
  475. elif upload_type == FDFS_UPLOAD_BY_BUFFER:
  476. tcp_send_data(store_conn, file_buffer)
  477. elif upload_type == FDFS_UPLOAD_BY_FILE:
  478. tcp_send_file_ex(store_conn, file_buffer)
  479. th.recv_header(store_conn)
  480. if th.status != 0:
  481. raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
  482. except:
  483. raise
  484. finally:
  485. self.pool.release(store_conn)
  486. ret_dict = {}
  487. ret_dict['Status'] = 'Append file successed.'
  488. ret_dict['Appender file name'] = store_serv.group_name + os.sep + appended_filename
  489. ret_dict['Appended size'] = appromix(file_size)
  490. ret_dict['Storage IP'] = store_serv.ip_addr
  491. return ret_dict
  492. def storage_append_by_filename(self, tracker_client, store_serv, \
  493. local_filename, appended_filename):
  494. file_size = os.stat(local_filename).st_size
  495. return self._storage_do_append_file(tracker_client, store_serv, \
  496. local_filename, file_size, \
  497. FDFS_UPLOAD_BY_FILENAME, appended_filename)
  498. def storage_append_by_file(self, tracker_client, store_serv, \
  499. local_filename, appended_filename):
  500. file_size = os.stat(local_filename).st_size
  501. return self._storage_do_append_file(tracker_client, store_serv, \
  502. local_filename, file_size, \
  503. FDFS_UPLOAD_BY_FILE, appended_filename)
  504. def storage_append_by_buffer(self, tracker_client, store_serv, \
  505. file_buffer, appended_filename):
  506. file_size = len(file_buffer)
  507. return self._storage_do_append_file(tracker_client, store_serv, \
  508. file_buffer, file_size, \
  509. FDFS_UPLOAD_BY_BUFFER, appended_filename)
  510. def _storage_do_truncate_file(self, tracker_client, store_serv, \
  511. truncated_filesize, appender_filename):
  512. store_conn = self.pool.get_connection()
  513. th = Tracker_header()
  514. th.cmd = STORAGE_PROTO_CMD_TRUNCATE_FILE
  515. appender_filename_len = len(appender_filename)
  516. th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + appender_filename_len
  517. try:
  518. th.send_header(store_conn)
  519. #truncate_fmt:|-appender_filename_len(8)-truncate_filesize(8)
  520. # -appender_filename(len)-|
  521. truncate_fmt = '!Q Q %ds' % appender_filename_len
  522. send_buffer = struct.pack(truncate_fmt, appender_filename_len, \
  523. truncated_filesize, appender_filename)
  524. tcp_send_data(store_conn, send_buffer)
  525. th.recv_header(store_conn)
  526. if th.status != 0:
  527. raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
  528. except:
  529. raise
  530. finally:
  531. self.pool.release(store_conn)
  532. ret_dict = {}
  533. ret_dict['Status'] = 'Truncate successed.'
  534. ret_dict['Storage IP'] = store_serv.ip_addr
  535. return ret_dict
  536. def storage_truncate_file(self, tracker_client, store_serv, \
  537. truncated_filesize, appender_filename):
  538. return self._storage_do_truncate_file(tracker_client, store_serv, \
  539. truncated_filesize, appender_filename)
  540. def _storage_do_modify_file(self, tracker_client, store_serv, upload_type, \
  541. filebuffer, offset, filesize, appender_filename):
  542. store_conn = self.pool.get_connection()
  543. th = Tracker_header()
  544. th.cmd = STORAGE_PROTO_CMD_MODIFY_FILE
  545. appender_filename_len = len(appender_filename)
  546. th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 3 + appender_filename_len + filesize
  547. try:
  548. th.send_header(store_conn)
  549. #modify_fmt: |-filename_len(8)-offset(8)-filesize(8)-filename(len)-|
  550. modify_fmt = '!Q Q Q %ds' % appender_filename_len
  551. send_buffer = struct.pack(modify_fmt, appender_filename_len, offset, \
  552. filesize, appender_filename)
  553. tcp_send_data(store_conn, send_buffer)
  554. if upload_type == FDFS_UPLOAD_BY_FILENAME:
  555. upload_size = tcp_send_file(store_conn, filebuffer)
  556. elif upload_type == FDFS_UPLOAD_BY_BUFFER:
  557. tcp_send_data(store_conn, filebuffer)
  558. elif upload_type == FDFS_UPLOAD_BY_FILE:
  559. upload_size = tcp_send_file_ex(store_conn, filebuffer)
  560. th.recv_header(store_conn)
  561. if th.status != 0:
  562. raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
  563. except:
  564. raise
  565. finally:
  566. self.pool.release(store_conn)
  567. ret_dict = {}
  568. ret_dict['Status'] = 'Modify successed.'
  569. ret_dict['Storage IP'] = store_serv.ip_addr
  570. return ret_dict
  571. def storage_modify_by_filename(self, tracker_client, store_serv, \
  572. filename, offset, \
  573. filesize, appender_filename):
  574. return self._storage_do_modify_file(tracker_client, store_serv, \
  575. FDFS_UPLOAD_BY_FILENAME, filename, offset, \
  576. filesize, appender_filename)
  577. def storage_modify_by_file(self, tracker_client, store_serv, \
  578. filename, offset, \
  579. filesize, appender_filename):
  580. return self._storage_do_modify_file(tracker_client, store_serv, \
  581. FDFS_UPLOAD_BY_FILE, filename, offset, \
  582. filesize, appender_filename)
  583. def storage_modify_by_buffer(self, tracker_client, store_serv, \
  584. filebuffer, offset, \
  585. filesize, appender_filename):
  586. return self._storage_do_modify_file(tracker_client, store_serv, \
  587. FDFS_UPLOAD_BY_BUFFER, filebuffer, offset, \
  588. filesize, appender_filename)