storage_client.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # filename: storage_client.py
  4. import os, stat
  5. import struct
  6. import socket
  7. import datetime
  8. import errno
  9. from fdfs_client.fdfs_protol import *
  10. from fdfs_client.connection import *
  11. from fdfs_client.sendfile import *
  12. from fdfs_client.exceptions import (
  13. FDFSError,
  14. ConnectionError,
  15. ResponseError,
  16. InvaildResponse,
  17. DataError
  18. )
  19. from fdfs_client.utils import *
  20. def tcp_send_file(conn, filename, buffer_size=1024):
  21. '''
  22. Send file to server, and split into multiple pkgs while sending.
  23. arguments:
  24. @conn: connection
  25. @filename: string
  26. @buffer_size: int ,send buffer size
  27. @Return int: file size if success else raise ConnectionError.
  28. '''
  29. file_size = 0
  30. with open(filename, 'rb') as f:
  31. while 1:
  32. try:
  33. send_buffer = f.read(buffer_size)
  34. send_size = len(send_buffer)
  35. if send_size == 0:
  36. break
  37. tcp_send_data(conn, send_buffer)
  38. file_size += send_size
  39. except ConnectionError as e:
  40. raise ConnectionError('[-] Error while uploading file(%s).' % e.args)
  41. except IOError as e:
  42. raise DataError('[-] Error while reading local file(%s).' % e.args)
  43. return file_size
  44. def tcp_send_file_ex(conn, filename, buffer_size=4096):
  45. '''
  46. Send file to server. Using linux system call 'sendfile'.
  47. arguments:
  48. @conn: connection
  49. @filename: string
  50. @return long, sended size
  51. '''
  52. if 'linux' not in sys.platform.lower():
  53. raise DataError('[-] Error: \'sendfile\' system call only available on linux.')
  54. nbytes = 0
  55. offset = 0
  56. sock_fd = conn.get_sock().fileno()
  57. with open(filename, 'rb') as f:
  58. in_fd = f.fileno()
  59. while 1:
  60. try:
  61. sent = sendfile(sock_fd, in_fd, offset, buffer_size)
  62. if 0 == sent:
  63. break
  64. nbytes += sent
  65. offset += sent
  66. except OSError as e:
  67. if e.errno == errno.EAGAIN:
  68. continue
  69. raise
  70. return nbytes
  71. def tcp_recv_file(conn, local_filename, file_size, buffer_size=1024):
  72. '''
  73. Receive file from server, fragmented it while receiving and write to disk.
  74. arguments:
  75. @conn: connection
  76. @local_filename: string
  77. @file_size: int, remote file size
  78. @buffer_size: int, receive buffer size
  79. @Return int: file size if success else raise ConnectionError.
  80. '''
  81. total_file_size = 0
  82. flush_size = 0
  83. remain_bytes = file_size
  84. with open(local_filename, 'wb+') as f:
  85. diff_size = remain_bytes
  86. while diff_size > buffer_size:
  87. diff_size = remain_bytes - total_file_size
  88. try:
  89. if diff_size >= buffer_size:
  90. file_buffer, recv_size = tcp_recv_response(conn, buffer_size, \
  91. buffer_size)
  92. else:
  93. file_buffer, recv_size = tcp_recv_response(conn, diff_size, \
  94. buffer_size)
  95. f.write(file_buffer)
  96. total_file_size += recv_size
  97. flush_size += recv_size
  98. if flush_size >= 4096:
  99. f.flush()
  100. flush_size = 0
  101. except ConnectionError as e:
  102. raise ConnectionError('[-] Error: while downloading file(%s).' % e.args)
  103. except IOError as e:
  104. raise DataError('[-] Error: while writting local file(%s).' % e.args)
  105. return total_file_size
  106. class Storage_client(object):
  107. """
  108. The Class Storage_client for storage server.
  109. Note: argument host_tuple of storage server ip address, that should be a single element.
  110. """
  111. def __init__(self, *kwargs):
  112. conn_kwargs = {
  113. 'name': 'Storage Pool',
  114. 'host_tuple': ((kwargs[0], kwargs[1]),),
  115. 'timeout': kwargs[2]
  116. }
  117. self.pool = ConnectionPool(**conn_kwargs)
  118. return None
  119. def __del__(self):
  120. try:
  121. self.pool.destroy()
  122. self.pool = None
  123. except:
  124. pass
  125. def update_pool(self, old_store_serv, new_store_serv, timeout=30):
  126. '''
  127. Update connection pool of storage client.
  128. We need update connection pool of storage client, while storage server is changed.
  129. but if server not changed, we do nothing.
  130. '''
  131. if old_store_serv.ip_addr == new_store_serv.ip_addr:
  132. return None
  133. self.pool.destroy()
  134. conn_kwargs = {
  135. 'name': 'Storage_pool',
  136. 'host_tuple': ((new_store_serv.ip_addr, new_store_serv.port),),
  137. 'timeout': timeout
  138. }
  139. self.pool = ConnectionPool(**conn_kwargs)
  140. return True
  141. def _storage_do_upload_file(self, tracker_client, store_serv, file_buffer, file_size=None, upload_type=None,
  142. meta_dict=None, cmd=None, master_filename=None, prefix_name=None, file_ext_name=None):
  143. """
  144. core of upload file.
  145. :rtype : object
  146. arguments:
  147. @tracker_client: Tracker_client, it is useful connect to tracker server
  148. @store_serv: Storage_server, it is return from query tracker server
  149. @file_buffer: string, file name or file buffer for send
  150. @file_size: int
  151. @upload_type: int, optional: FDFS_UPLOAD_BY_FILE, FDFS_UPLOAD_BY_FILENAME,
  152. FDFS_UPLOAD_BY_BUFFER
  153. @meta_dic: dictionary, store metadata in it
  154. @cmd: int, reference fdfs protol
  155. @master_filename: string, useful upload slave file
  156. @prefix_name: string
  157. @file_ext_name: string
  158. @Return dictionary
  159. {
  160. 'Group name' : group_name,
  161. 'Remote file_id' : remote_file_id,
  162. 'Status' : status,
  163. 'Local file name' : local_filename,
  164. 'Uploaded size' : upload_size,
  165. 'Storage IP' : storage_ip
  166. }
  167. """
  168. print('getting connection')
  169. store_conn = self.pool.get_connection()
  170. print(store_conn)
  171. th = Tracker_header()
  172. print(th)
  173. master_filename_len = len(master_filename) if master_filename else 0
  174. prefix_name_len = len(prefix_name) if prefix_name else 0
  175. upload_slave = len(store_serv.group_name) and master_filename_len
  176. file_ext_name = str(file_ext_name) if file_ext_name else ''
  177. # non_slave_fmt |-store_path_index(1)-file_size(8)-file_ext_name(6)-|
  178. non_slave_fmt = '!B Q %ds' % FDFS_FILE_EXT_NAME_MAX_LEN
  179. # slave_fmt |-master_len(8)-file_size(8)-prefix_name(16)-file_ext_name(6)
  180. # -master_name(master_filename_len)-|
  181. slave_fmt = '!Q Q %ds %ds %ds' % (FDFS_FILE_PREFIX_MAX_LEN, FDFS_FILE_EXT_NAME_MAX_LEN, master_filename_len)
  182. th.pkg_len = struct.calcsize(slave_fmt) if upload_slave \
  183. else struct.calcsize(non_slave_fmt)
  184. th.pkg_len += file_size
  185. th.cmd = cmd
  186. th.send_header(store_conn)
  187. if upload_slave:
  188. send_buffer = struct.pack(slave_fmt, master_filename_len, file_size, prefix_name, file_ext_name,
  189. master_filename)
  190. else:
  191. send_buffer = struct.pack(non_slave_fmt, store_serv.store_path_index, file_size, file_ext_name.encode())
  192. try:
  193. tcp_send_data(store_conn, send_buffer)
  194. if upload_type == FDFS_UPLOAD_BY_FILENAME:
  195. send_file_size = tcp_send_file(store_conn, file_buffer)
  196. elif upload_type == FDFS_UPLOAD_BY_BUFFER:
  197. tcp_send_data(store_conn, file_buffer)
  198. elif upload_type == FDFS_UPLOAD_BY_FILE:
  199. send_file_size = tcp_send_file_ex(store_conn, file_buffer)
  200. th.recv_header(store_conn)
  201. if th.status != 0:
  202. raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
  203. recv_buffer, recv_size = tcp_recv_response(store_conn, th.pkg_len)
  204. if recv_size <= FDFS_GROUP_NAME_MAX_LEN:
  205. errmsg = '[-] Error: Storage response length is not match, '
  206. errmsg += 'expect: %d, actual: %d' % (th.pkg_len, recv_size)
  207. raise ResponseError(errmsg)
  208. # recv_fmt: |-group_name(16)-remote_file_name(recv_size - 16)-|
  209. recv_fmt = '!%ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, \
  210. th.pkg_len - FDFS_GROUP_NAME_MAX_LEN)
  211. (group_name, remote_name) = struct.unpack(recv_fmt, recv_buffer)
  212. remote_filename = remote_name.strip(b'\x00').decode()
  213. if meta_dict and len(meta_dict) > 0:
  214. status = self.storage_set_metadata(tracker_client, store_serv, remote_filename, meta_dict)
  215. if status != 0:
  216. # rollback
  217. self.storage_delete_file(tracker_client, store_serv, remote_filename)
  218. raise DataError('[-] Error: %d, %s' % (status, os.strerror(status)))
  219. except:
  220. raise
  221. finally:
  222. self.pool.release(store_conn)
  223. ret_dic = {
  224. 'Group name': group_name.strip(b'\x00').decode(),
  225. 'Remote file_id': group_name.strip(b'\x00').decode() + os.sep + \
  226. remote_filename,
  227. 'Status': 'Upload successed.',
  228. 'Local file name': file_buffer if (
  229. upload_type == FDFS_UPLOAD_BY_FILENAME or upload_type == FDFS_UPLOAD_BY_FILE) \
  230. else '',
  231. 'Uploaded size': appromix(send_file_size) if (
  232. upload_type == FDFS_UPLOAD_BY_FILENAME or upload_type == FDFS_UPLOAD_BY_FILE) else appromix(
  233. len(file_buffer)),
  234. 'Storage IP': store_serv.ip_addr
  235. }
  236. return ret_dic
  237. def storage_upload_by_filename(self, tracker_client, store_serv, filename, \
  238. meta_dict=None):
  239. file_size = os.stat(filename).st_size
  240. file_ext_name = get_file_ext_name(filename)
  241. return self._storage_do_upload_file(tracker_client, store_serv, filename,
  242. file_size, FDFS_UPLOAD_BY_FILENAME, meta_dict,
  243. STORAGE_PROTO_CMD_UPLOAD_FILE, None,
  244. None, file_ext_name)
  245. def storage_upload_by_file(self, tracker_client, store_serv, filename,
  246. meta_dict=None):
  247. file_size = os.stat(filename).st_size
  248. file_ext_name = get_file_ext_name(filename)
  249. return self._storage_do_upload_file(tracker_client, store_serv, filename,
  250. file_size, FDFS_UPLOAD_BY_FILE, meta_dict,
  251. STORAGE_PROTO_CMD_UPLOAD_FILE, None,
  252. None, file_ext_name)
  253. def storage_upload_by_buffer(self, tracker_client, store_serv,
  254. file_buffer, file_ext_name=None, meta_dict=None):
  255. buffer_size = len(file_buffer)
  256. return self._storage_do_upload_file(tracker_client, store_serv, file_buffer, \
  257. buffer_size, FDFS_UPLOAD_BY_BUFFER, meta_dict, \
  258. STORAGE_PROTO_CMD_UPLOAD_FILE, None, \
  259. None, file_ext_name)
  260. def storage_upload_slave_by_filename(self, tracker_client, store_serv, \
  261. filename, prefix_name, remote_filename, \
  262. meta_dict=None):
  263. file_size = os.stat(filename).st_size
  264. file_ext_name = get_file_ext_name(filename)
  265. return self._storage_do_upload_file(tracker_client, store_serv, filename, \
  266. file_size, FDFS_UPLOAD_BY_FILENAME, meta_dict, \
  267. STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE, \
  268. remote_filename, prefix_name, \
  269. file_ext_name)
  270. def storage_upload_slave_by_file(self, tracker_client, store_serv, \
  271. filename, prefix_name, remote_filename, \
  272. meta_dict=None):
  273. file_size = os.stat(filename).st_size
  274. file_ext_name = get_file_ext_name(filename)
  275. return self._storage_do_upload_file(tracker_client, store_serv, filename,
  276. file_size, FDFS_UPLOAD_BY_FILE, meta_dict,
  277. STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE,
  278. remote_filename, prefix_name,
  279. file_ext_name)
  280. def storage_upload_slave_by_buffer(self, tracker_client, store_serv,
  281. filebuffer, remote_filename, meta_dict,
  282. file_ext_name):
  283. file_size = len(filebuffer)
  284. return self._storage_do_upload_file(tracker_client, store_serv,
  285. filebuffer, file_size, FDFS_UPLOAD_BY_BUFFER,
  286. meta_dict, STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE,
  287. None, remote_filename, file_ext_name)
  288. def storage_upload_appender_by_filename(self, tracker_client, store_serv,
  289. filename, meta_dict=None):
  290. file_size = os.stat(filename).st_size
  291. file_ext_name = get_file_ext_name(filename)
  292. return self._storage_do_upload_file(tracker_client, store_serv, filename,
  293. file_size, FDFS_UPLOAD_BY_FILENAME, meta_dict,
  294. STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE,
  295. None, None, file_ext_name)
  296. def storage_upload_appender_by_file(self, tracker_client, store_serv, filename, meta_dict=None):
  297. file_size = os.stat(filename).st_size
  298. file_ext_name = get_file_ext_name(filename)
  299. return self._storage_do_upload_file(tracker_client, store_serv, filename, file_size, FDFS_UPLOAD_BY_FILE,
  300. meta_dict,
  301. STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE, None, None, file_ext_name)
  302. def storage_upload_appender_by_buffer(self, tracker_client, store_serv,
  303. file_buffer, meta_dict=None,
  304. file_ext_name=None):
  305. file_size = len(file_buffer)
  306. return self._storage_do_upload_file(tracker_client, store_serv, file_buffer, \
  307. file_size, FDFS_UPLOAD_BY_BUFFER, meta_dict, \
  308. STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE, \
  309. None, None, file_ext_name)
  310. def storage_delete_file(self, tracker_client, store_serv, remote_filename):
  311. '''
  312. Delete file from storage server.
  313. '''
  314. store_conn = self.pool.get_connection()
  315. th = Tracker_header()
  316. th.cmd = STORAGE_PROTO_CMD_DELETE_FILE
  317. file_name_len = len(remote_filename)
  318. th.pkg_len = FDFS_GROUP_NAME_MAX_LEN + file_name_len
  319. try:
  320. th.send_header(store_conn)
  321. # del_fmt: |-group_name(16)-filename(len)-|
  322. del_fmt = '!%ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, file_name_len)
  323. send_buffer = struct.pack(del_fmt, store_serv.group_name.encode(), remote_filename.encode())
  324. tcp_send_data(store_conn, send_buffer)
  325. th.recv_header(store_conn)
  326. # if th.status == 2:
  327. # raise DataError('[-] Error: remote file %s is not exist.' \
  328. # % (store_serv.group_name + os.sep + remote_filename))
  329. if th.status != 0:
  330. raise DataError('Error: %d, %s' % (th.status, os.strerror(th.status)))
  331. # recv_buffer, recv_size = tcp_recv_response(store_conn, th.pkg_len)
  332. except:
  333. raise
  334. finally:
  335. self.pool.release(store_conn)
  336. remote_filename = store_serv.group_name + os.sep + remote_filename
  337. return ('Delete file successed.', remote_filename, store_serv.ip_addr)
  338. def _storage_do_download_file(self, tracker_client, store_serv, file_buffer, \
  339. offset, download_size, download_type, remote_filename):
  340. '''
  341. Core of download file from storage server.
  342. You can choice download type, optional FDFS_DOWNLOAD_TO_FILE or
  343. FDFS_DOWNLOAD_TO_BUFFER. And you can choice file offset.
  344. @Return dictionary
  345. 'Remote file name' : remote_filename,
  346. 'Content' : local_filename or buffer,
  347. 'Download size' : download_size,
  348. 'Storage IP' : storage_ip
  349. '''
  350. store_conn = self.pool.get_connection()
  351. th = Tracker_header()
  352. remote_filename_len = len(remote_filename)
  353. th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + FDFS_GROUP_NAME_MAX_LEN + \
  354. remote_filename_len
  355. th.cmd = STORAGE_PROTO_CMD_DOWNLOAD_FILE
  356. try:
  357. th.send_header(store_conn)
  358. # down_fmt: |-offset(8)-download_bytes(8)-group_name(16)-remote_filename(len)-|
  359. down_fmt = '!Q Q %ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, remote_filename_len)
  360. send_buffer = struct.pack(down_fmt, offset, download_size, store_serv.group_name.encode(),
  361. remote_filename.encode())
  362. tcp_send_data(store_conn, send_buffer)
  363. th.recv_header(store_conn)
  364. # if th.status == 2:
  365. # raise DataError('[-] Error: remote file %s is not exist.' %
  366. # (store_serv.group_name + os.sep + remote_filename))
  367. if th.status != 0:
  368. raise DataError('Error: %d %s' % (th.status, os.strerror(th.status)))
  369. if download_type == FDFS_DOWNLOAD_TO_FILE:
  370. total_recv_size = tcp_recv_file(store_conn, file_buffer, th.pkg_len)
  371. elif download_type == FDFS_DOWNLOAD_TO_BUFFER:
  372. recv_buffer, total_recv_size = tcp_recv_response(store_conn, th.pkg_len)
  373. except:
  374. raise
  375. finally:
  376. self.pool.release(store_conn)
  377. ret_dic = {
  378. 'Remote file_id': store_serv.group_name + os.sep + remote_filename,
  379. 'Content': file_buffer if download_type == \
  380. FDFS_DOWNLOAD_TO_FILE else recv_buffer,
  381. 'Download size': appromix(total_recv_size),
  382. 'Storage IP': store_serv.ip_addr
  383. }
  384. return ret_dic
  385. def storage_download_to_file(self, tracker_client, store_serv, local_filename, \
  386. file_offset, download_bytes, remote_filename):
  387. return self._storage_do_download_file(tracker_client, store_serv, local_filename, \
  388. file_offset, download_bytes, \
  389. FDFS_DOWNLOAD_TO_FILE, remote_filename)
  390. def storage_download_to_buffer(self, tracker_client, store_serv, file_buffer, file_offset, download_bytes,
  391. remote_filename):
  392. return self._storage_do_download_file(tracker_client, store_serv, file_buffer, file_offset, download_bytes,
  393. FDFS_DOWNLOAD_TO_BUFFER, remote_filename)
  394. def storage_set_metadata(self, tracker_client, store_serv, remote_filename, meta_dict,
  395. op_flag=STORAGE_SET_METADATA_FLAG_OVERWRITE):
  396. ret = 0
  397. conn = self.pool.get_connection()
  398. remote_filename_len = len(remote_filename)
  399. meta_buffer = fdfs_pack_metadata(meta_dict)
  400. meta_len = len(meta_buffer)
  401. th = Tracker_header()
  402. th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + 1 + \
  403. FDFS_GROUP_NAME_MAX_LEN + remote_filename_len + meta_len
  404. th.cmd = STORAGE_PROTO_CMD_SET_METADATA
  405. try:
  406. th.send_header(conn)
  407. # meta_fmt: |-filename_len(8)-meta_len(8)-op_flag(1)-group_name(16)
  408. # -filename(remote_filename_len)-meta(meta_len)|
  409. meta_fmt = '!Q Q c %ds %ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, remote_filename_len, meta_len)
  410. send_buffer = struct.pack(meta_fmt, remote_filename_len, meta_len, op_flag, store_serv.group_name.encode(),
  411. remote_filename.encode(), meta_buffer)
  412. tcp_send_data(conn, send_buffer)
  413. th.recv_header(conn)
  414. if th.status != 0:
  415. ret = th.status
  416. except:
  417. raise
  418. finally:
  419. self.pool.release(conn)
  420. return ret
  421. def storage_get_metadata(self, tracker_client, store_serv, remote_file_name):
  422. store_conn = self.pool.get_connection()
  423. th = Tracker_header()
  424. remote_filename_len = len(remote_file_name)
  425. th.pkg_len = FDFS_GROUP_NAME_MAX_LEN + remote_filename_len
  426. th.cmd = STORAGE_PROTO_CMD_GET_METADATA
  427. try:
  428. th.send_header(store_conn)
  429. # meta_fmt: |-group_name(16)-filename(remote_filename_len)-|
  430. meta_fmt = '!%ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, remote_filename_len)
  431. send_buffer = struct.pack(meta_fmt, store_serv.group_name.encode(), remote_file_name.encode())
  432. tcp_send_data(store_conn, send_buffer)
  433. th.recv_header(store_conn)
  434. # if th.status == 2:
  435. # raise DataError('[-] Error: Remote file %s has no meta data.' \
  436. # % (store_serv.group_name + os.sep + remote_file_name))
  437. if th.status != 0:
  438. raise DataError('[-] Error:%d, %s' % (th.status, os.strerror(th.status)))
  439. if th.pkg_len == 0:
  440. ret_dict = {}
  441. meta_buffer, recv_size = tcp_recv_response(store_conn, th.pkg_len)
  442. except:
  443. raise
  444. finally:
  445. self.pool.release(store_conn)
  446. ret_dict = fdfs_unpack_metadata(meta_buffer)
  447. return ret_dict
  448. def _storage_do_append_file(self, tracker_client, store_serv, file_buffer, \
  449. file_size, upload_type, appended_filename):
  450. store_conn = self.pool.get_connection()
  451. th = Tracker_header()
  452. appended_filename_len = len(appended_filename)
  453. th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + appended_filename_len + file_size
  454. th.cmd = STORAGE_PROTO_CMD_APPEND_FILE
  455. try:
  456. th.send_header(store_conn)
  457. # append_fmt: |-appended_filename_len(8)-file_size(8)-appended_filename(len)
  458. # -filecontent(filesize)-|
  459. append_fmt = '!Q Q %ds' % appended_filename_len
  460. send_buffer = struct.pack(append_fmt, appended_filename_len, file_size, appended_filename.encode())
  461. tcp_send_data(store_conn, send_buffer)
  462. if upload_type == FDFS_UPLOAD_BY_FILENAME:
  463. tcp_send_file(store_conn, file_buffer)
  464. elif upload_type == FDFS_UPLOAD_BY_BUFFER:
  465. tcp_send_data(store_conn, file_buffer)
  466. elif upload_type == FDFS_UPLOAD_BY_FILE:
  467. tcp_send_file_ex(store_conn, file_buffer)
  468. th.recv_header(store_conn)
  469. if th.status != 0:
  470. raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
  471. except:
  472. raise
  473. finally:
  474. self.pool.release(store_conn)
  475. ret_dict = {'Status': 'Append file successed.',
  476. 'Appender file name': store_serv.group_name + os.sep + appended_filename,
  477. 'Appended size': appromix(file_size), 'Storage IP': store_serv.ip_addr}
  478. return ret_dict
  479. def storage_append_by_filename(self, tracker_client, store_serv, \
  480. local_filename, appended_filename):
  481. file_size = os.stat(local_filename).st_size
  482. return self._storage_do_append_file(tracker_client, store_serv, \
  483. local_filename, file_size, \
  484. FDFS_UPLOAD_BY_FILENAME, appended_filename)
  485. def storage_append_by_file(self, tracker_client, store_serv, \
  486. local_filename, appended_filename):
  487. file_size = os.stat(local_filename).st_size
  488. return self._storage_do_append_file(tracker_client, store_serv, \
  489. local_filename, file_size, \
  490. FDFS_UPLOAD_BY_FILE, appended_filename)
  491. def storage_append_by_buffer(self, tracker_client, store_serv,
  492. file_buffer, appended_filename):
  493. file_size = len(file_buffer)
  494. return self._storage_do_append_file(tracker_client, store_serv,
  495. file_buffer, file_size,
  496. FDFS_UPLOAD_BY_BUFFER, appended_filename)
  497. def _storage_do_truncate_file(self, tracker_client, store_serv,
  498. truncated_filesize, appender_filename):
  499. store_conn = self.pool.get_connection()
  500. th = Tracker_header()
  501. th.cmd = STORAGE_PROTO_CMD_TRUNCATE_FILE
  502. appender_filename_len = len(appender_filename)
  503. th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + appender_filename_len
  504. try:
  505. th.send_header(store_conn)
  506. # truncate_fmt:|-appender_filename_len(8)-truncate_filesize(8)
  507. # -appender_filename(len)-|
  508. truncate_fmt = '!Q Q %ds' % appender_filename_len
  509. send_buffer = struct.pack(truncate_fmt, appender_filename_len, truncated_filesize,
  510. appender_filename.encode())
  511. tcp_send_data(store_conn, send_buffer)
  512. th.recv_header(store_conn)
  513. if th.status != 0:
  514. raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
  515. except:
  516. raise
  517. finally:
  518. self.pool.release(store_conn)
  519. ret_dict = {}
  520. ret_dict['Status'] = 'Truncate successed.'
  521. ret_dict['Storage IP'] = store_serv.ip_addr
  522. return ret_dict
  523. def storage_truncate_file(self, tracker_client, store_serv, \
  524. truncated_filesize, appender_filename):
  525. return self._storage_do_truncate_file(tracker_client, store_serv, \
  526. truncated_filesize, appender_filename)
  527. def _storage_do_modify_file(self, tracker_client, store_serv, upload_type, \
  528. filebuffer, offset, filesize, appender_filename):
  529. store_conn = self.pool.get_connection()
  530. th = Tracker_header()
  531. th.cmd = STORAGE_PROTO_CMD_MODIFY_FILE
  532. appender_filename_len = len(appender_filename)
  533. th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 3 + appender_filename_len + filesize
  534. try:
  535. th.send_header(store_conn)
  536. # modify_fmt: |-filename_len(8)-offset(8)-filesize(8)-filename(len)-|
  537. modify_fmt = '!Q Q Q %ds' % appender_filename_len
  538. send_buffer = struct.pack(modify_fmt, appender_filename_len, offset, filesize, appender_filename.encode())
  539. tcp_send_data(store_conn, send_buffer)
  540. if upload_type == FDFS_UPLOAD_BY_FILENAME:
  541. upload_size = tcp_send_file(store_conn, filebuffer)
  542. elif upload_type == FDFS_UPLOAD_BY_BUFFER:
  543. tcp_send_data(store_conn, filebuffer)
  544. elif upload_type == FDFS_UPLOAD_BY_FILE:
  545. upload_size = tcp_send_file_ex(store_conn, filebuffer)
  546. th.recv_header(store_conn)
  547. if th.status != 0:
  548. raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
  549. except:
  550. raise
  551. finally:
  552. self.pool.release(store_conn)
  553. ret_dict = {}
  554. ret_dict['Status'] = 'Modify successed.'
  555. ret_dict['Storage IP'] = store_serv.ip_addr
  556. return ret_dict
  557. def storage_modify_by_filename(self, tracker_client, store_serv, filename, offset, filesize, appender_filename):
  558. return self._storage_do_modify_file(tracker_client, store_serv, FDFS_UPLOAD_BY_FILENAME, filename, offset,
  559. filesize, appender_filename)
  560. def storage_modify_by_file(self, tracker_client, store_serv, filename, offset, filesize, appender_filename):
  561. return self._storage_do_modify_file(tracker_client, store_serv, \
  562. FDFS_UPLOAD_BY_FILE, filename, offset, \
  563. filesize, appender_filename)
  564. def storage_modify_by_buffer(self, tracker_client, store_serv, \
  565. filebuffer, offset, \
  566. filesize, appender_filename):
  567. return self._storage_do_modify_file(tracker_client, store_serv, \
  568. FDFS_UPLOAD_BY_BUFFER, filebuffer, offset, \
  569. filesize, appender_filename)