storage_client.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # filename: storage_client.py
  4. import os, stat
  5. import struct
  6. import socket
  7. import datetime
  8. import errno
  9. from fdfs_client.fdfs_protol import *
  10. from fdfs_client.connection import *
  11. from fdfs_client.sendfile import *
  12. from fdfs_client.exceptions import (
  13. FDFSError,
  14. ConnectionError,
  15. ResponseError,
  16. InvaildResponse,
  17. DataError
  18. )
  19. from fdfs_client.utils import *
  20. def tcp_send_file(conn, filename, buffer_size = 1024):
  21. '''
  22. Send file to server, and split into multiple pkgs while sending.
  23. arguments:
  24. @conn: connection
  25. @filename: string
  26. @buffer_size: int ,send buffer size
  27. @Return int: file size if success else raise ConnectionError.
  28. '''
  29. file_size = 0
  30. with open(filename, 'rb') as f:
  31. while 1:
  32. try:
  33. send_buffer = f.read(buffer_size)
  34. send_size = len(send_buffer)
  35. if send_size == 0:
  36. break
  37. tcp_send_data(conn, send_buffer)
  38. file_size += send_size
  39. except ConnectionError, e:
  40. raise ConnectionError('[-] Error while uploading file(%s).' % e.args)
  41. except IOError, e:
  42. raise DataError('[-] Error while reading local file(%s).' % e.args)
  43. return file_size
  44. def tcp_send_file_ex(conn, filename, buffer_size = 4096):
  45. '''
  46. Send file to server. Using linux system call 'sendfile'.
  47. arguments:
  48. @conn: connection
  49. @filename: string
  50. @return long, sended size
  51. '''
  52. if 'linux' not in sys.platform.lower():
  53. raise DataError('[-] Error: \'sendfile\' system call only available on linux.')
  54. nbytes = 0
  55. offset = 0
  56. sock_fd = conn.get_sock().fileno()
  57. with open(filename, 'rb') as f:
  58. in_fd = f.fileno()
  59. while 1:
  60. try:
  61. sent = sendfile(sock_fd, in_fd, offset, buffer_size)
  62. if 0 == sent:
  63. break
  64. nbytes += sent
  65. offset += sent
  66. except OSError, e:
  67. if e.errno == errno.EAGAIN:
  68. continue
  69. raise
  70. return nbytes
  71. def tcp_recv_file(conn, local_filename, file_size, buffer_size = 1024):
  72. '''
  73. Receive file from server, fragmented it while receiving and write to disk.
  74. arguments:
  75. @conn: connection
  76. @local_filename: string
  77. @file_size: int, remote file size
  78. @buffer_size: int, receive buffer size
  79. @Return int: file size if success else raise ConnectionError.
  80. '''
  81. total_file_size = 0
  82. flush_size = 0
  83. remain_bytes = file_size
  84. with open(local_filename, 'wb+') as f:
  85. while remain_bytes > 0:
  86. try:
  87. if remain_bytes >= buffer_size:
  88. file_buffer, recv_size = tcp_recv_response(conn, buffer_size, \
  89. buffer_size)
  90. else:
  91. file_buffer, recv_size = tcp_recv_response(conn, remain_bytes, \
  92. buffer_size)
  93. f.write(file_buffer)
  94. remain_bytes -= buffer_size
  95. total_file_size += recv_size
  96. flush_size += recv_size
  97. if flush_size >= 4096:
  98. f.flush()
  99. flush_size = 0
  100. except ConnectionError, e:
  101. raise ConnectionError('[-] Error: while downloading file(%s).' % e.args)
  102. except IOError, e:
  103. raise DataError('[-] Error: while writting local file(%s).' % e.args)
  104. return total_file_size
  105. class Storage_client(object):
  106. '''
  107. The Class Storage_client for storage server.
  108. Note: argument host_tuple of storage server ip address, that should be a single element.
  109. '''
  110. def __init__(self, *kwargs):
  111. conn_kwargs = {
  112. 'name' : 'Storage Pool',
  113. 'host_tuple' : ((kwargs[0],kwargs[1]),),
  114. 'timeout' : kwargs[2]
  115. }
  116. self.pool = ConnectionPool(**conn_kwargs)
  117. return None
  118. def __del__(self):
  119. try:
  120. self.pool.destroy()
  121. self.pool = None
  122. except:
  123. pass
  124. def update_pool(self, old_store_serv, new_store_serv, timeout = 30):
  125. '''
  126. Update connection pool of storage client.
  127. We need update connection pool of storage client, while storage server is changed.
  128. but if server not changed, we do nothing.
  129. '''
  130. if old_store_serv.ip_addr == new_store_serv.ip_addr:
  131. return None
  132. self.pool.destroy()
  133. conn_kwargs = {
  134. 'name' : 'Storage_pool',
  135. 'host_tuple' : ((new_store_serv.ip_addr,new_store_serv.port),),
  136. 'timeout' : timeout
  137. }
  138. self.pool = ConnectionPool(**conn_kwargs)
  139. return True
  140. def _storage_do_upload_file(self, tracker_client, store_serv, \
  141. file_buffer, file_size = None, upload_type = None, \
  142. meta_dict = None, cmd = None, master_filename = None, \
  143. prefix_name = None, file_ext_name = None):
  144. '''
  145. core of upload file.
  146. arguments:
  147. @tracker_client: Tracker_client, it is useful connect to tracker server
  148. @store_serv: Storage_server, it is return from query tracker server
  149. @file_buffer: string, file name or file buffer for send
  150. @file_size: int
  151. @upload_type: int, optional: FDFS_UPLOAD_BY_FILE, FDFS_UPLOAD_BY_FILENAME,
  152. FDFS_UPLOAD_BY_BUFFER
  153. @meta_dic: dictionary, store metadata in it
  154. @cmd: int, reference fdfs protol
  155. @master_filename: string, useful upload slave file
  156. @prefix_name: string
  157. @file_ext_name: string
  158. @Return dictionary
  159. {
  160. 'Group name' : group_name,
  161. 'Remote file_id' : remote_file_id,
  162. 'Status' : status,
  163. 'Local file name' : local_filename,
  164. 'Uploaded size' : upload_size,
  165. 'Storage IP' : storage_ip
  166. }
  167. '''
  168. store_conn = self.pool.get_connection()
  169. th = Tracker_header()
  170. master_filename_len = len(master_filename) if master_filename else 0
  171. prefix_name_len = len(prefix_name) if prefix_name else 0
  172. upload_slave = len(store_serv.group_name) and master_filename_len
  173. file_ext_name = str(file_ext_name) if file_ext_name else ''
  174. #non_slave_fmt |-store_path_index(1)-file_size(8)-file_ext_name(6)-|
  175. non_slave_fmt = '!B Q %ds' % FDFS_FILE_EXT_NAME_MAX_LEN
  176. #slave_fmt |-master_len(8)-file_size(8)-prefix_name(16)-file_ext_name(6)
  177. # -master_name(master_filename_len)-|
  178. slave_fmt = '!Q Q %ds %ds %ds' % (FDFS_FILE_PREFIX_MAX_LEN, \
  179. FDFS_FILE_EXT_NAME_MAX_LEN, \
  180. master_filename_len)
  181. th.pkg_len = struct.calcsize(slave_fmt) if upload_slave \
  182. else struct.calcsize(non_slave_fmt)
  183. th.pkg_len += file_size
  184. th.cmd = cmd
  185. th.send_header(store_conn)
  186. if upload_slave:
  187. send_buffer = struct.pack(slave_fmt, master_filename_len, file_size, \
  188. prefix_name, file_ext_name, \
  189. master_filename)
  190. else:
  191. send_buffer = struct.pack(non_slave_fmt, store_serv.store_path_index, \
  192. file_size, file_ext_name)
  193. try:
  194. tcp_send_data(store_conn, send_buffer)
  195. if upload_type == FDFS_UPLOAD_BY_FILENAME:
  196. send_file_size = tcp_send_file(store_conn, file_buffer)
  197. elif upload_type == FDFS_UPLOAD_BY_BUFFER:
  198. tcp_send_data(store_conn, file_buffer)
  199. elif upload_type == FDFS_UPLOAD_BY_FILE:
  200. send_file_size = tcp_send_file_ex(store_conn, file_buffer)
  201. th.recv_header(store_conn)
  202. if th.status != 0:
  203. raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
  204. recv_buffer, recv_size = tcp_recv_response(store_conn, th.pkg_len)
  205. if recv_size <= FDFS_GROUP_NAME_MAX_LEN:
  206. errmsg = '[-] Error: Storage response length is not match, '
  207. errmsg += 'expect: %d, actual: %d' % (th.pkg_len, recv_size)
  208. raise ResponseError(errmsg)
  209. #recv_fmt: |-group_name(16)-remote_file_name(recv_size - 16)-|
  210. recv_fmt = '!%ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, \
  211. th.pkg_len - FDFS_GROUP_NAME_MAX_LEN)
  212. (group_name, remote_name) = struct.unpack(recv_fmt, recv_buffer)
  213. remote_filename = remote_name.strip('\x00')
  214. if meta_dict and len(meta_dict) > 0:
  215. status = self.storage_set_metadata(tracker_client, store_serv, \
  216. remote_filename, meta_dict)
  217. if status != 0:
  218. #rollback
  219. self.storage_delete_file(tracker_client, store_serv, remote_filename)
  220. raise DataError('[-] Error: %d, %s' % (status, os.strerror(status)))
  221. except:
  222. raise
  223. finally:
  224. self.pool.release(store_conn)
  225. ret_dic = {
  226. 'Group name' : group_name.strip('\x00'),
  227. 'Remote file_id' : group_name.strip('\x00') + os.sep + \
  228. remote_filename,
  229. 'Status' : 'Upload successed.',
  230. 'Local file name' : file_buffer if (upload_type == FDFS_UPLOAD_BY_FILENAME \
  231. or upload_type == FDFS_UPLOAD_BY_FILE) \
  232. else '',
  233. 'Uploaded size' : appromix(send_file_size) if (upload_type == \
  234. FDFS_UPLOAD_BY_FILENAME or upload_type == \
  235. FDFS_UPLOAD_BY_FILE) else appromix( len(file_buffer)),
  236. 'Storage IP' : store_serv.ip_addr
  237. }
  238. return ret_dic
  239. def storage_upload_by_filename(self, tracker_client, store_serv, filename, \
  240. meta_dict = None):
  241. file_size = os.stat(filename).st_size
  242. file_ext_name = get_file_ext_name(filename)
  243. return self._storage_do_upload_file(tracker_client, store_serv, filename, \
  244. file_size, FDFS_UPLOAD_BY_FILENAME, meta_dict, \
  245. STORAGE_PROTO_CMD_UPLOAD_FILE, None, \
  246. None, file_ext_name)
  247. def storage_upload_by_file(self, tracker_client, store_serv, filename, \
  248. meta_dict = None):
  249. file_size = os.stat(filename).st_size
  250. file_ext_name = get_file_ext_name(filename)
  251. return self._storage_do_upload_file(tracker_client, store_serv, filename, \
  252. file_size, FDFS_UPLOAD_BY_FILE, meta_dict, \
  253. STORAGE_PROTO_CMD_UPLOAD_FILE, None, \
  254. None, file_ext_name)
  255. def storage_upload_by_buffer(self, tracker_client, store_serv, \
  256. file_buffer, file_ext_name = None, meta_dict = None):
  257. buffer_size = len(file_buffer)
  258. return self._storage_do_upload_file(tracker_client, store_serv, file_buffer, \
  259. buffer_size, FDFS_UPLOAD_BY_BUFFER, meta_dict, \
  260. STORAGE_PROTO_CMD_UPLOAD_FILE, None, \
  261. None, file_ext_name)
  262. def storage_upload_slave_by_filename(self, tracker_client, store_serv, \
  263. filename, prefix_name, remote_filename, \
  264. meta_dict = None):
  265. file_size = os.stat(filename).st_size
  266. file_ext_name = get_file_ext_name(filename)
  267. return self._storage_do_upload_file(tracker_client, store_serv, filename, \
  268. file_size, FDFS_UPLOAD_BY_FILENAME, meta_dict, \
  269. STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE, \
  270. remote_filename, prefix_name, \
  271. file_ext_name)
  272. def storage_upload_slave_by_file(self, tracker_client, store_serv, \
  273. filename, prefix_name, remote_filename, \
  274. meta_dict = None):
  275. file_size = os.stat(filename).st_size
  276. file_ext_name = get_file_ext_name(filename)
  277. return self._storage_do_upload_file(tracker_client, store_serv, filename, \
  278. file_size, FDFS_UPLOAD_BY_FILE, meta_dict, \
  279. STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE, \
  280. remote_filename, prefix_name, \
  281. file_ext_name)
  282. def storage_upload_slave_by_buffer(self, tracker_client, store_serv, \
  283. filebuffer, remote_filename, meta_dict, \
  284. file_ext_name):
  285. file_size = len(filebuffer)
  286. return self._storage_do_upload_file(tracker_client, store_serv, \
  287. filebuffer, file_size, FDFS_UPLOAD_BY_BUFFER, \
  288. meta_dict, STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE, \
  289. None, remote_filename, file_ext_name)
  290. def storage_upload_appender_by_filename(self, tracker_client, store_serv, \
  291. filename, meta_dict = None):
  292. file_size = os.stat(filename).st_size
  293. file_ext_name = get_file_ext_name(filename)
  294. return self._storage_do_upload_file(tracker_client, store_serv, filename, \
  295. file_size, FDFS_UPLOAD_BY_FILENAME, meta_dict, \
  296. STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE, \
  297. None, None, file_ext_name)
  298. def storage_upload_appender_by_file(self, tracker_client, store_serv, \
  299. filename, meta_dict = None):
  300. file_size = os.stat(filename).st_size
  301. file_ext_name = get_file_ext_name(filename)
  302. return self._storage_do_upload_file(tracker_client, store_serv, filename, \
  303. file_size, FDFS_UPLOAD_BY_FILE, meta_dict, \
  304. STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE, \
  305. None, None, file_ext_name)
  306. def storage_upload_appender_by_buffer(self, tracker_client, store_serv, \
  307. file_buffer, meta_dict = None, \
  308. file_ext_name = None):
  309. file_size = len(file_buffer)
  310. return self._storage_do_upload_file(tracker_client, store_serv, file_buffer, \
  311. file_size, FDFS_UPLOAD_BY_BUFFER, meta_dict, \
  312. STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE, \
  313. None, None, file_ext_name)
  314. def storage_delete_file(self, tracker_client, store_serv, remote_filename):
  315. '''
  316. Delete file from storage server.
  317. '''
  318. store_conn = self.pool.get_connection()
  319. th = Tracker_header()
  320. th.cmd = STORAGE_PROTO_CMD_DELETE_FILE
  321. file_name_len = len(remote_filename)
  322. th.pkg_len = FDFS_GROUP_NAME_MAX_LEN + file_name_len
  323. try:
  324. th.send_header(store_conn)
  325. #del_fmt: |-group_name(16)-filename(len)-|
  326. del_fmt = '!%ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, file_name_len)
  327. send_buffer = struct.pack(del_fmt, store_serv.group_name, remote_filename)
  328. tcp_send_data(store_conn, send_buffer)
  329. th.recv_header(store_conn)
  330. #if th.status == 2:
  331. # raise DataError('[-] Error: remote file %s is not exist.' \
  332. # % (store_serv.group_name + os.sep + remote_filename))
  333. if th.status != 0:
  334. raise DataError('Error: %d, %s' % (th.status, os.strerror(th.status)))
  335. #recv_buffer, recv_size = tcp_recv_response(store_conn, th.pkg_len)
  336. except:
  337. raise
  338. finally:
  339. self.pool.release(store_conn)
  340. remote_filename = store_serv.group_name + os.sep + remote_filename
  341. return ('Delete file successed.', remote_filename, store_serv.ip_addr)
  342. def _storage_do_download_file(self, tracker_client, store_serv, file_buffer, \
  343. offset, download_size, download_type, remote_filename):
  344. '''
  345. Core of download file from storage server.
  346. You can choice download type, optional FDFS_DOWNLOAD_TO_FILE or
  347. FDFS_DOWNLOAD_TO_BUFFER. And you can choice file offset.
  348. @Return dictionary
  349. 'Remote file name' : remote_filename,
  350. 'Content' : local_filename or buffer,
  351. 'Download size' : download_size,
  352. 'Storage IP' : storage_ip
  353. '''
  354. store_conn = self.pool.get_connection()
  355. th = Tracker_header()
  356. remote_filename_len = len(remote_filename)
  357. th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + FDFS_GROUP_NAME_MAX_LEN + \
  358. remote_filename_len
  359. th.cmd = STORAGE_PROTO_CMD_DOWNLOAD_FILE
  360. try:
  361. th.send_header(store_conn)
  362. #down_fmt: |-offset(8)-download_bytes(8)-group_name(16)-remote_filename(len)-|
  363. down_fmt = '!Q Q %ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, remote_filename_len)
  364. send_buffer = struct.pack(down_fmt, offset, download_size, \
  365. store_serv.group_name, remote_filename)
  366. tcp_send_data(store_conn, send_buffer)
  367. th.recv_header(store_conn)
  368. #if th.status == 2:
  369. # raise DataError('[-] Error: remote file %s is not exist.' %
  370. # (store_serv.group_name + os.sep + remote_filename))
  371. if th.status != 0:
  372. raise DataError('Error: %d %s' % (th.status, os.strerror(th.status)))
  373. if download_type == FDFS_DOWNLOAD_TO_FILE:
  374. total_recv_size = tcp_recv_file(store_conn, file_buffer, th.pkg_len)
  375. elif download_type == FDFS_DOWNLOAD_TO_BUFFER:
  376. recv_buffer, total_recv_size = tcp_recv_response(store_conn, th.pkg_len)
  377. except:
  378. raise
  379. finally:
  380. self.pool.release(store_conn)
  381. ret_dic = {
  382. 'Remote file_id' : store_serv.group_name + os.sep + remote_filename,
  383. 'Content' : file_buffer if download_type == \
  384. FDFS_DOWNLOAD_TO_FILE else recv_buffer,
  385. 'Download size' : appromix(total_recv_size),
  386. 'Storage IP' : store_serv.ip_addr
  387. }
  388. return ret_dic
  389. def storage_download_to_file(self, tracker_client, store_serv, local_filename, \
  390. file_offset, download_bytes, remote_filename):
  391. return self._storage_do_download_file(tracker_client, store_serv, local_filename, \
  392. file_offset, download_bytes, \
  393. FDFS_DOWNLOAD_TO_FILE, remote_filename)
  394. def storage_download_to_buffer(self, tracker_client, store_serv, file_buffer, \
  395. file_offset, download_bytes, remote_filename):
  396. return self._storage_do_download_file(tracker_client, store_serv, file_buffer, \
  397. file_offset, download_bytes, \
  398. FDFS_DOWNLOAD_TO_BUFFER, remote_filename)
  399. def storage_set_metadata(self, tracker_client, store_serv, \
  400. remote_filename, meta_dict, \
  401. op_flag = STORAGE_SET_METADATA_FLAG_OVERWRITE):
  402. ret = 0
  403. conn = self.pool.get_connection()
  404. remote_filename_len = len(remote_filename)
  405. meta_buffer = fdfs_pack_metadata(meta_dict)
  406. meta_len = len(meta_buffer)
  407. th = Tracker_header()
  408. th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + 1 + \
  409. FDFS_GROUP_NAME_MAX_LEN + remote_filename_len + meta_len
  410. th.cmd = STORAGE_PROTO_CMD_SET_METADATA
  411. try:
  412. th.send_header(conn)
  413. #meta_fmt: |-filename_len(8)-meta_len(8)-op_flag(1)-group_name(16)
  414. # -filename(remote_filename_len)-meta(meta_len)|
  415. meta_fmt = '!Q Q c %ds %ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, \
  416. remote_filename_len, meta_len)
  417. send_buffer = struct.pack(meta_fmt, remote_filename_len, meta_len, \
  418. op_flag, store_serv.group_name, \
  419. remote_filename, meta_buffer)
  420. tcp_send_data(conn, send_buffer)
  421. th.recv_header(conn)
  422. if th.status != 0 :
  423. ret = th.status
  424. except:
  425. raise
  426. finally:
  427. self.pool.release(conn)
  428. return ret
  429. def storage_get_metadata(self, tracker_client, store_serv, remote_file_name):
  430. store_conn = self.pool.get_connection()
  431. th = Tracker_header()
  432. remote_filename_len = len(remote_file_name)
  433. th.pkg_len = FDFS_GROUP_NAME_MAX_LEN + remote_filename_len
  434. th.cmd = STORAGE_PROTO_CMD_GET_METADATA
  435. try:
  436. th.send_header(store_conn)
  437. #meta_fmt: |-group_name(16)-filename(remote_filename_len)-|
  438. meta_fmt = '!%ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, remote_filename_len)
  439. send_buffer = struct.pack(meta_fmt, store_serv.group_name, remote_file_name)
  440. tcp_send_data(store_conn, send_buffer)
  441. th.recv_header(store_conn)
  442. #if th.status == 2:
  443. # raise DataError('[-] Error: Remote file %s has no meta data.' \
  444. # % (store_serv.group_name + os.sep + remote_file_name))
  445. if th.status != 0:
  446. raise DataError('[-] Error:%d, %s' % (th.status, os.strerror(th.status)))
  447. if th.pkg_len == 0:
  448. ret_dict = {}
  449. meta_buffer, recv_size = tcp_recv_response(store_conn, th.pkg_len)
  450. except:
  451. raise
  452. finally:
  453. self.pool.release(store_conn)
  454. ret_dict = fdfs_unpack_metadata(meta_buffer)
  455. return ret_dict
  456. def _storage_do_append_file(self, tracker_client, store_serv, file_buffer, \
  457. file_size, upload_type, appended_filename):
  458. store_conn = self.pool.get_connection()
  459. th = Tracker_header()
  460. appended_filename_len = len(appended_filename)
  461. th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + appended_filename_len + file_size
  462. th.cmd = STORAGE_PROTO_CMD_APPEND_FILE
  463. try:
  464. th.send_header(store_conn)
  465. #append_fmt: |-appended_filename_len(8)-file_size(8)-appended_filename(len)
  466. # -filecontent(filesize)-|
  467. append_fmt = '!Q Q %ds' % appended_filename_len
  468. send_buffer = struct.pack(append_fmt, appended_filename_len, file_size, \
  469. appended_filename)
  470. tcp_send_data(store_conn, send_buffer)
  471. if upload_type == FDFS_UPLOAD_BY_FILENAME:
  472. tcp_send_file(store_conn, file_buffer)
  473. elif upload_type == FDFS_UPLOAD_BY_BUFFER:
  474. tcp_send_data(store_conn, file_buffer)
  475. elif upload_type == FDFS_UPLOAD_BY_FILE:
  476. tcp_send_file_ex(store_conn, file_buffer)
  477. th.recv_header(store_conn)
  478. if th.status != 0:
  479. raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
  480. except:
  481. raise
  482. finally:
  483. self.pool.release(store_conn)
  484. ret_dict = {}
  485. ret_dict['Status'] = 'Append file successed.'
  486. ret_dict['Appender file name'] = store_serv.group_name + os.sep + appended_filename
  487. ret_dict['Appended size'] = appromix(file_size)
  488. ret_dict['Storage IP'] = store_serv.ip_addr
  489. return ret_dict
  490. def storage_append_by_filename(self, tracker_client, store_serv, \
  491. local_filename, appended_filename):
  492. file_size = os.stat(local_filename).st_size
  493. return self._storage_do_append_file(tracker_client, store_serv, \
  494. local_filename, file_size, \
  495. FDFS_UPLOAD_BY_FILENAME, appended_filename)
  496. def storage_append_by_file(self, tracker_client, store_serv, \
  497. local_filename, appended_filename):
  498. file_size = os.stat(local_filename).st_size
  499. return self._storage_do_append_file(tracker_client, store_serv, \
  500. local_filename, file_size, \
  501. FDFS_UPLOAD_BY_FILE, appended_filename)
  502. def storage_append_by_buffer(self, tracker_client, store_serv, \
  503. file_buffer, appended_filename):
  504. file_size = len(file_buffer)
  505. return self._storage_do_append_file(tracker_client, store_serv, \
  506. file_buffer, file_size, \
  507. FDFS_UPLOAD_BY_BUFFER, appended_filename)
  508. def _storage_do_truncate_file(self, tracker_client, store_serv, \
  509. truncated_filesize, appender_filename):
  510. store_conn = self.pool.get_connection()
  511. th = Tracker_header()
  512. th.cmd = STORAGE_PROTO_CMD_TRUNCATE_FILE
  513. appender_filename_len = len(appender_filename)
  514. th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + appender_filename_len
  515. try:
  516. th.send_header(store_conn)
  517. #truncate_fmt:|-appender_filename_len(8)-truncate_filesize(8)
  518. # -appender_filename(len)-|
  519. truncate_fmt = '!Q Q %ds' % appender_filename_len
  520. send_buffer = struct.pack(truncate_fmt, appender_filename_len, \
  521. truncated_filesize, appender_filename)
  522. tcp_send_data(store_conn, send_buffer)
  523. th.recv_header(store_conn)
  524. if th.status != 0:
  525. raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
  526. except:
  527. raise
  528. finally:
  529. self.pool.release(store_conn)
  530. ret_dict = {}
  531. ret_dict['Status'] = 'Truncate successed.'
  532. ret_dict['Storage IP'] = store_serv.ip_addr
  533. return ret_dict
  534. def storage_truncate_file(self, tracker_client, store_serv, \
  535. truncated_filesize, appender_filename):
  536. return self._storage_do_truncate_file(tracker_client, store_serv, \
  537. truncated_filesize, appender_filename)
  538. def _storage_do_modify_file(self, tracker_client, store_serv, upload_type, \
  539. filebuffer, offset, filesize, appender_filename):
  540. store_conn = self.pool.get_connection()
  541. th = Tracker_header()
  542. th.cmd = STORAGE_PROTO_CMD_MODIFY_FILE
  543. appender_filename_len = len(appender_filename)
  544. th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 3 + appender_filename_len + filesize
  545. try:
  546. th.send_header(store_conn)
  547. #modify_fmt: |-filename_len(8)-offset(8)-filesize(8)-filename(len)-|
  548. modify_fmt = '!Q Q Q %ds' % appender_filename_len
  549. send_buffer = struct.pack(modify_fmt, appender_filename_len, offset, \
  550. filesize, appender_filename)
  551. tcp_send_data(store_conn, send_buffer)
  552. if upload_type == FDFS_UPLOAD_BY_FILENAME:
  553. upload_size = tcp_send_file(store_conn, filebuffer)
  554. elif upload_type == FDFS_UPLOAD_BY_BUFFER:
  555. tcp_send_data(store_conn, filebuffer)
  556. elif upload_type == FDFS_UPLOAD_BY_FILE:
  557. upload_size = tcp_send_file_ex(store_conn, filebuffer)
  558. th.recv_header(store_conn)
  559. if th.status != 0:
  560. raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
  561. except:
  562. raise
  563. finally:
  564. self.pool.release(store_conn)
  565. ret_dict = {}
  566. ret_dict['Status'] = 'Modify successed.'
  567. ret_dict['Storage IP'] = store_serv.ip_addr
  568. return ret_dict
  569. def storage_modify_by_filename(self, tracker_client, store_serv, \
  570. filename, offset, \
  571. filesize, appender_filename):
  572. return self._storage_do_modify_file(tracker_client, store_serv, \
  573. FDFS_UPLOAD_BY_FILENAME, filename, offset, \
  574. filesize, appender_filename)
  575. def storage_modify_by_file(self, tracker_client, store_serv, \
  576. filename, offset, \
  577. filesize, appender_filename):
  578. return self._storage_do_modify_file(tracker_client, store_serv, \
  579. FDFS_UPLOAD_BY_FILE, filename, offset, \
  580. filesize, appender_filename)
  581. def storage_modify_by_buffer(self, tracker_client, store_serv, \
  582. filebuffer, offset, \
  583. filesize, appender_filename):
  584. return self._storage_do_modify_file(tracker_client, store_serv, \
  585. FDFS_UPLOAD_BY_BUFFER, filebuffer, offset, \
  586. filesize, appender_filename)