tracker_client.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # filename: tracker_client.py
  4. import struct
  5. import socket
  6. from datetime import datetime
  7. from fdfs_client.fdfs_protol import *
  8. from fdfs_client.connection import *
  9. from fdfs_client.exceptions import (
  10. FDFSError,
  11. ConnectionError,
  12. ResponseError,
  13. InvaildResponse,
  14. DataError
  15. )
  16. from fdfs_client.utils import *
  17. def parse_storage_status(status_code):
  18. try:
  19. ret = {
  20. FDFS_STORAGE_STATUS_INIT: lambda: 'INIT',
  21. FDFS_STORAGE_STATUS_WAIT_SYNC: lambda: 'WAIT_SYNC',
  22. FDFS_STORAGE_STATUS_SYNCING: lambda: 'SYNCING',
  23. FDFS_STORAGE_STATUS_IP_CHANGED: lambda: 'IP_CHANGED',
  24. FDFS_STORAGE_STATUS_DELETED: lambda: 'DELETED',
  25. FDFS_STORAGE_STATUS_OFFLINE: lambda: 'OFFLINE',
  26. FDFS_STORAGE_STATUS_ONLINE: lambda: 'ONLINE',
  27. FDFS_STORAGE_STATUS_ACTIVE: lambda: 'ACTIVE',
  28. FDFS_STORAGE_STATUS_RECOVERY: lambda: 'RECOVERY'
  29. }[status_code]()
  30. except KeyError:
  31. ret = 'UNKNOW'
  32. return ret
  33. class Storage_info(object):
  34. def __init__(self):
  35. self.status = 0
  36. self.id = ''
  37. self.ip_addr = ''
  38. self.domain_name = ''
  39. self.src_ip_addr = ''
  40. self.version = ''
  41. self.totalMB = ''
  42. self.freeMB = ''
  43. self.upload_prio = 0
  44. self.join_time = datetime.fromtimestamp(0).isoformat()
  45. self.up_time = datetime.fromtimestamp(0).isoformat()
  46. self.store_path_count = 0
  47. self.subdir_count_per_path = 0
  48. self.storage_port = 23000
  49. self.storage_http_port = 80
  50. self.curr_write_path = 0
  51. self.total_upload_count = 0
  52. self.success_upload_count = 0
  53. self.total_append_count = 0
  54. self.success_append_count = 0
  55. self.total_modify_count = 0
  56. self.success_modify_count = 0
  57. self.total_truncate_count = 0
  58. self.success_truncate_count = 0
  59. self.total_setmeta_count = 0
  60. self.success_setmeta_count = 0
  61. self.total_del_count = 0
  62. self.success_del_count = 0
  63. self.total_download_count = 0
  64. self.success_download_count = 0
  65. self.total_getmeta_count = 0
  66. self.success_getmeta_count = 0
  67. self.total_create_link_count = 0
  68. self.success_create_link_count = 0
  69. self.total_del_link_count = 0
  70. self.success_del_link_count = 0
  71. self.total_upload_bytes = 0
  72. self.success_upload_bytes = 0
  73. self.total_append_bytes = 0
  74. self.success_append_bytes = 0
  75. self.total_modify_bytes = 0
  76. self.success_modify_bytes = 0
  77. self.total_download_bytes = 0
  78. self.success_download_bytes = 0
  79. self.total_sync_in_bytes = 0
  80. self.success_sync_in_bytes = 0
  81. self.total_sync_out_bytes = 0
  82. self.success_sync_out_bytes = 0
  83. self.total_file_open_count = 0
  84. self.success_file_open_count = 0
  85. self.total_file_read_count = 0
  86. self.success_file_read_count = 0
  87. self.total_file_write_count = 0
  88. self.success_file_write_count = 0
  89. self.last_source_sync = datetime.fromtimestamp(0).isoformat()
  90. self.last_sync_update = datetime.fromtimestamp(0).isoformat()
  91. self.last_synced_time = datetime.fromtimestamp(0).isoformat()
  92. self.last_heartbeat_time = datetime.fromtimestamp(0).isoformat()
  93. self.if_trunk_server = 0
  94. # fmt = |-status(1)-ipaddr(16)-domain(128)-srcipaddr(16)-ver(6)-52*8-|
  95. self.fmt = '!B %ds %ds %ds %ds %ds 52QB' % (FDFS_STORAGE_ID_MAX_SIZE, \
  96. IP_ADDRESS_SIZE, \
  97. FDFS_DOMAIN_NAME_MAX_LEN, \
  98. IP_ADDRESS_SIZE, \
  99. FDFS_VERSION_SIZE)
  100. def set_info(self, bytes_stream):
  101. (self.status, id, ip_addr, domain_name, src_ip_addr, version, join_time, up_time, totalMB, freeMB, self.upload_prio,
  102. self.store_path_count, self.subdir_count_per_path, self.storage_port, self.storage_http_port, self.curr_write_path,
  103. self.total_upload_count, self.success_upload_count, self.total_append_count, self.success_append_count, self.total_modify_count, self.success_modify_count,
  104. self.total_truncate_count, self.success_truncate_count, self.total_setmeta_count, self.success_setmeta_count,
  105. self.total_del_count, self.success_del_count, self.total_download_count, self.success_download_count, self.total_getmeta_count, self.success_getmeta_count,
  106. self.total_create_link_count, self.success_create_link_count, self.total_del_link_count, self.success_del_link_count,
  107. self.total_upload_bytes, self.success_upload_bytes, self.total_append_bytes, self.total_append_bytes, self.total_modify_bytes, self.success_modify_bytes,
  108. self.total_download_bytes, self.success_download_bytes, self.total_sync_in_bytes, self.success_sync_in_bytes,
  109. self.total_sync_out_bytes, self.success_sync_out_bytes, self.total_file_open_count, self.success_file_open_count,
  110. self.total_file_read_count, self.success_file_read_count, self.total_file_write_count, self.success_file_write_count,
  111. last_source_sync, last_sync_update, last_synced_time, last_heartbeat_time, self.if_trunk_server) \
  112. = struct.unpack(self.fmt, bytes_stream)
  113. try:
  114. self.id = id.strip(b'\x00').decode()
  115. self.ip_addr = ip_addr.strip(b'\x00').decode()
  116. self.domain_name = domain_name.strip(b'\x00').decode()
  117. self.version = version.strip(b'\x00').decode()
  118. self.src_ip_addr = src_ip_addr.strip(b'\x00').decode()
  119. self.totalMB = appromix(totalMB, FDFS_SPACE_SIZE_BASE_INDEX)
  120. self.freeMB = appromix(freeMB, FDFS_SPACE_SIZE_BASE_INDEX)
  121. except ValueError as e:
  122. raise ResponseError('[-] Error: disk space overrun, can not represented it.')
  123. self.join_time = datetime.fromtimestamp(join_time).isoformat()
  124. self.up_time = datetime.fromtimestamp(up_time).isoformat()
  125. self.last_source_sync = datetime.fromtimestamp(last_source_sync).isoformat()
  126. self.last_sync_update = datetime.fromtimestamp(last_sync_update).isoformat()
  127. self.last_synced_time = datetime.fromtimestamp(last_synced_time).isoformat()
  128. self.last_heartbeat_time = \
  129. datetime.fromtimestamp(last_heartbeat_time).isoformat()
  130. return True
  131. def __str__(self):
  132. """Transform to readable string."""
  133. s = 'Storage information:\n'
  134. s += '\tid = %s\n' % (self.id)
  135. s += '\tip_addr = %s (%s)\n' % (self.ip_addr, parse_storage_status(self.status))
  136. s += '\thttp domain = %s\n' % self.domain_name
  137. s += '\tversion = %s\n' % self.version
  138. s += '\tjoin time = %s\n' % self.join_time
  139. s += '\tup time = %s\n' % self.up_time
  140. s += '\ttotal storage = %s\n' % self.totalMB
  141. s += '\tfree storage = %s\n' % self.freeMB
  142. s += '\tupload priority = %d\n' % self.upload_prio
  143. s += '\tstore path count = %d\n' % self.store_path_count
  144. s += '\tsubdir count per path = %d\n' % self.subdir_count_per_path
  145. s += '\tstorage port = %d\n' % self.storage_port
  146. s += '\tstorage HTTP port = %d\n' % self.storage_http_port
  147. s += '\tcurrent write path = %d\n' % self.curr_write_path
  148. s += '\tsource ip_addr = %s\n' % self.src_ip_addr
  149. s += '\tif_trunk_server = %d\n' % self.if_trunk_server
  150. s += '\ttotal upload count = %ld\n' % self.total_upload_count
  151. s += '\tsuccess upload count = %ld\n' % self.success_upload_count
  152. s += '\ttotal download count = %ld\n' % self.total_download_count
  153. s += '\tsuccess download count = %ld\n' % self.success_download_count
  154. s += '\ttotal append count = %ld\n' % self.total_append_count
  155. s += '\tsuccess append count = %ld\n' % self.success_append_count
  156. s += '\ttotal modify count = %ld\n' % self.total_modify_count
  157. s += '\tsuccess modify count = %ld\n' % self.success_modify_count
  158. s += '\ttotal truncate count = %ld\n' % self.total_truncate_count
  159. s += '\tsuccess truncate count = %ld\n' % self.success_truncate_count
  160. s += '\ttotal delete count = %ld\n' % self.total_del_count
  161. s += '\tsuccess delete count = %ld\n' % self.success_del_count
  162. s += '\ttotal set_meta count = %ld\n' % self.total_setmeta_count
  163. s += '\tsuccess set_meta count = %ld\n' % self.success_setmeta_count
  164. s += '\ttotal get_meta count = %ld\n' % self.total_getmeta_count
  165. s += '\tsuccess get_meta count = %ld\n' % self.success_getmeta_count
  166. s += '\ttotal create link count = %ld\n' % self.total_create_link_count
  167. s += '\tsuccess create link count = %ld\n' % self.success_create_link_count
  168. s += '\ttotal delete link count = %ld\n' % self.total_del_link_count
  169. s += '\tsuccess delete link count = %ld\n' % self.success_del_link_count
  170. s += '\ttotal upload bytes = %ld\n' % self.total_upload_bytes
  171. s += '\tsuccess upload bytes = %ld\n' % self.success_upload_bytes
  172. s += '\ttotal download bytes = %ld\n' % self.total_download_bytes
  173. s += '\tsuccess download bytes = %ld\n' % self.success_download_bytes
  174. s += '\ttotal append bytes = %ld\n' % self.total_append_bytes
  175. s += '\tsuccess append bytes = %ld\n' % self.success_append_bytes
  176. s += '\ttotal modify bytes = %ld\n' % self.total_modify_bytes
  177. s += '\tsuccess modify bytes = %ld\n' % self.success_modify_bytes
  178. s += '\ttotal sync_in bytes = %ld\n' % self.total_sync_in_bytes
  179. s += '\tsuccess sync_in bytes = %ld\n' % self.success_sync_in_bytes
  180. s += '\ttotal sync_out bytes = %ld\n' % self.total_sync_out_bytes
  181. s += '\tsuccess sync_out bytes = %ld\n' % self.success_sync_out_bytes
  182. s += '\ttotal file open count = %ld\n' % self.total_file_open_count
  183. s += '\tsuccess file open count = %ld\n' % self.success_file_open_count
  184. s += '\ttotal file read count = %ld\n' % self.total_file_read_count
  185. s += '\tsuccess file read count = %ld\n' % self.success_file_read_count
  186. s += '\ttotal file write count = %ld\n' % self.total_file_write_count
  187. s += '\tsucess file write count = %ld\n' % self.success_file_write_count
  188. s += '\tlast heartbeat time = %s\n' % self.last_heartbeat_time
  189. s += '\tlast source update = %s\n' % self.last_source_sync
  190. s += '\tlast sync update = %s\n' % self.last_sync_update
  191. s += '\tlast synced time = %s\n' % self.last_synced_time
  192. return s
  193. def get_fmt_size(self):
  194. return struct.calcsize(self.fmt)
  195. class Group_info(object):
  196. def __init__(self):
  197. self.group_name = ''
  198. self.totalMB = ''
  199. self.freeMB = ''
  200. self.trunk_freeMB = ''
  201. self.count = 0
  202. self.storage_port = 0
  203. self.store_http_port = 0
  204. self.active_count = 0
  205. self.curr_write_server = 0
  206. self.store_path_count = 0
  207. self.subdir_count_per_path = 0
  208. self.curr_trunk_file_id = 0
  209. self.fmt = '!%ds 11Q' % (FDFS_GROUP_NAME_MAX_LEN + 1)
  210. return None
  211. def __str__(self):
  212. s = 'Group information:\n'
  213. s += '\tgroup name = %s\n' % self.group_name
  214. s += '\tdisk total space = %s\n' % self.totalMB
  215. s += '\tdisk free space = %s\n' % self.freeMB
  216. s += '\ttrunk free space = %s\n' % self.trunk_freeMB
  217. s += '\tstorage server count = %d\n' % self.count
  218. s += '\tstorage port = %d\n' % self.storage_port
  219. s += '\tstorage HTTP port = %d\n' % self.store_http_port
  220. s += '\tactive server count = %d\n' % self.active_count
  221. s += '\tcurrent write server index = %d\n' % self.curr_write_server
  222. s += '\tstore path count = %d\n' % self.store_path_count
  223. s += '\tsubdir count per path = %d\n' % self.subdir_count_per_path
  224. s += '\tcurrent trunk file id = %d\n' % self.curr_trunk_file_id
  225. return s
  226. def set_info(self, bytes_stream):
  227. (group_name, totalMB, freeMB, trunk_freeMB, self.count, self.storage_port, \
  228. self.store_http_port, self.active_count, self.curr_write_server, \
  229. self.store_path_count, self.subdir_count_per_path, self.curr_trunk_file_id) \
  230. = struct.unpack(self.fmt, bytes_stream)
  231. try:
  232. self.group_name = group_name.strip(b'\x00').decode()
  233. self.totalMB = appromix(totalMB, FDFS_SPACE_SIZE_BASE_INDEX)
  234. self.freeMB = appromix(freeMB, FDFS_SPACE_SIZE_BASE_INDEX)
  235. self.trunk_freeMB = appromix(trunk_freeMB, FDFS_SPACE_SIZE_BASE_INDEX)
  236. except ValueError:
  237. raise DataError('[-] Error disk space overrun, can not represented it.')
  238. def get_fmt_size(self):
  239. return struct.calcsize(self.fmt)
  240. class Tracker_client(object):
  241. """Class Tracker client."""
  242. def __init__(self, pool):
  243. self.pool = pool
  244. def tracker_list_servers(self, group_name, storage_ip=None):
  245. """
  246. List servers in a storage group
  247. """
  248. conn = self.pool.get_connection()
  249. th = Tracker_header()
  250. ip_len = len(storage_ip) if storage_ip else 0
  251. if ip_len >= IP_ADDRESS_SIZE:
  252. ip_len = IP_ADDRESS_SIZE - 1
  253. th.pkg_len = FDFS_GROUP_NAME_MAX_LEN + ip_len
  254. th.cmd = TRACKER_PROTO_CMD_SERVER_LIST_STORAGE
  255. group_fmt = '!%ds' % FDFS_GROUP_NAME_MAX_LEN
  256. store_ip_addr = storage_ip or ''
  257. storage_ip_fmt = '!%ds' % ip_len
  258. try:
  259. th.send_header(conn)
  260. send_buffer = struct.pack(group_fmt, group_name) + \
  261. struct.pack(storage_ip_fmt, store_ip_addr)
  262. tcp_send_data(conn, send_buffer)
  263. th.recv_header(conn)
  264. if th.status != 0:
  265. raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
  266. recv_buffer, recv_size = tcp_recv_response(conn, th.pkg_len)
  267. si = Storage_info()
  268. si_fmt_size = si.get_fmt_size()
  269. recv_size = len(recv_buffer)
  270. if recv_size % si_fmt_size != 0:
  271. errinfo = '[-] Error: response size not match, expect: %d, actual: %d' \
  272. % (th.pkg_len, recv_size)
  273. raise ResponseError(errinfo)
  274. except ConnectionError:
  275. conn.disconnect()
  276. raise
  277. finally:
  278. self.pool.release(conn)
  279. num_storage = recv_size / si_fmt_size
  280. si_list = []
  281. i = 0
  282. while num_storage:
  283. si.set_info(recv_buffer[(i * si_fmt_size): ((i + 1) * si_fmt_size)])
  284. si_list.append(si)
  285. si = Storage_info()
  286. num_storage -= 1
  287. i += 1
  288. ret_dict = {}
  289. ret_dict['Group name'] = group_name
  290. ret_dict['Servers'] = si_list
  291. return ret_dict
  292. def tracker_list_one_group(self, group_name):
  293. conn = self.pool.get_connection()
  294. th = Tracker_header()
  295. th.pkg_len = FDFS_GROUP_NAME_MAX_LEN
  296. th.cmd = TRACKER_PROTO_CMD_SERVER_LIST_ONE_GROUP
  297. # group_fmt: |-group_name(16)-|
  298. group_fmt = '!%ds' % FDFS_GROUP_NAME_MAX_LEN
  299. try:
  300. th.send_header(conn)
  301. send_buffer = struct.pack(group_fmt, group_name)
  302. tcp_send_data(conn, send_buffer)
  303. th.recv_header(conn)
  304. if th.status != 0:
  305. raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
  306. recv_buffer, recv_size = tcp_recv_response(conn, th.pkg_len)
  307. group_info = Group_info()
  308. group_info.set_info(recv_buffer)
  309. except ConnectionError:
  310. conn.disconnect()
  311. raise
  312. finally:
  313. self.pool.release(conn)
  314. return group_info
  315. def tracker_list_all_groups(self):
  316. conn = self.pool.get_connection()
  317. th = Tracker_header()
  318. th.cmd = TRACKER_PROTO_CMD_SERVER_LIST_ALL_GROUPS
  319. try:
  320. th.send_header(conn)
  321. th.recv_header(conn)
  322. if th.status != 0:
  323. raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
  324. recv_buffer, recv_size = tcp_recv_response(conn, th.pkg_len)
  325. except ConnectionError:
  326. conn.disconnect()
  327. raise
  328. finally:
  329. self.pool.release(conn)
  330. gi = Group_info()
  331. gi_fmt_size = gi.get_fmt_size()
  332. if recv_size % gi_fmt_size != 0:
  333. errmsg = '[-] Error: Response size is mismatch, except: %d, actul: %d' \
  334. % (th.pkg_len, recv_size)
  335. raise ResponseError(errmsg)
  336. num_groups = recv_size / gi_fmt_size
  337. ret_dict = {}
  338. ret_dict['Groups count'] = num_groups
  339. gi_list = []
  340. i = 0
  341. while num_groups:
  342. gi.set_info(recv_buffer[i * gi_fmt_size: (i + 1) * gi_fmt_size])
  343. gi_list.append(gi)
  344. gi = Group_info()
  345. i += 1
  346. num_groups -= 1
  347. ret_dict['Groups'] = gi_list
  348. return ret_dict
  349. def tracker_query_storage_stor_without_group(self):
  350. """Query storage server for upload, without group name.
  351. Return: Storage_server object"""
  352. conn = self.pool.get_connection()
  353. th = Tracker_header()
  354. th.cmd = TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE
  355. try:
  356. th.send_header(conn)
  357. th.recv_header(conn)
  358. if th.status != 0:
  359. raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
  360. recv_buffer, recv_size = tcp_recv_response(conn, th.pkg_len)
  361. if recv_size != TRACKER_QUERY_STORAGE_STORE_BODY_LEN:
  362. errmsg = '[-] Error: Tracker response length is invaild, '
  363. errmsg += 'expect: %d, actual: %d' \
  364. % (TRACKER_QUERY_STORAGE_STORE_BODY_LEN, recv_size)
  365. raise ResponseError(errmsg)
  366. except ConnectionError:
  367. conn.disconnect()
  368. raise
  369. finally:
  370. self.pool.release(conn)
  371. # recv_fmt |-group_name(16)-ipaddr(16-1)-port(8)-store_path_index(1)|
  372. recv_fmt = '!%ds %ds Q B' % (FDFS_GROUP_NAME_MAX_LEN, IP_ADDRESS_SIZE - 1)
  373. store_serv = Storage_server()
  374. (group_name, ip_addr, \
  375. store_serv.port, store_serv.store_path_index) = struct.unpack(recv_fmt, recv_buffer)
  376. store_serv.group_name = group_name.strip(b'\x00').decode()
  377. store_serv.ip_addr = ip_addr.strip(b'\x00').decode()
  378. return store_serv
  379. def tracker_query_storage_stor_with_group(self, group_name):
  380. """Query storage server for upload, based group name.
  381. arguments:
  382. @group_name: string
  383. @Return Storage_server object
  384. """
  385. conn = self.pool.get_connection()
  386. th = Tracker_header()
  387. th.cmd = TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITH_GROUP_ONE
  388. th.pkg_len = FDFS_GROUP_NAME_MAX_LEN
  389. th.send_header(conn)
  390. group_fmt = '!%ds' % FDFS_GROUP_NAME_MAX_LEN
  391. send_buffer = struct.pack(group_fmt, group_name)
  392. try:
  393. tcp_send_data(conn, send_buffer)
  394. th.recv_header(conn)
  395. if th.status != 0:
  396. raise DataError('Error: %d, %s' % (th.status, os.strerror(th.status)))
  397. recv_buffer, recv_size = tcp_recv_response(conn, th.pkg_len)
  398. if recv_size != TRACKER_QUERY_STORAGE_STORE_BODY_LEN:
  399. errmsg = '[-] Error: Tracker response length is invaild, '
  400. errmsg += 'expect: %d, actual: %d' \
  401. % (TRACKER_QUERY_STORAGE_STORE_BODY_LEN, recv_size)
  402. raise ResponseError(errmsg)
  403. except ConnectionError:
  404. conn.disconnect()
  405. raise
  406. finally:
  407. self.pool.release(conn)
  408. # recv_fmt: |-group_name(16)-ipaddr(16-1)-port(8)-store_path_index(1)-|
  409. recv_fmt = '!%ds %ds Q B' % (FDFS_GROUP_NAME_MAX_LEN, IP_ADDRESS_SIZE - 1)
  410. store_serv = Storage_server()
  411. (group, ip_addr, \
  412. store_serv.port, store_serv.store_path_index) = struct.unpack(recv_fmt, recv_buffer)
  413. store_serv.group_name = group.strip(b'\x00').decode()
  414. store_serv.ip_addr = ip_addr.strip(b'\x00').decode()
  415. return store_serv
  416. def _tracker_do_query_storage(self, group_name, filename, cmd):
  417. """
  418. core of query storage, based group name and filename.
  419. It is useful download, delete and set meta.
  420. arguments:
  421. @group_name: string
  422. @filename: string. remote file_id
  423. @Return: Storage_server object
  424. """
  425. conn = self.pool.get_connection()
  426. th = Tracker_header()
  427. file_name_len = len(filename)
  428. th.pkg_len = FDFS_GROUP_NAME_MAX_LEN + file_name_len
  429. th.cmd = cmd
  430. th.send_header(conn)
  431. # query_fmt: |-group_name(16)-filename(file_name_len)-|
  432. query_fmt = '!%ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, file_name_len)
  433. send_buffer = struct.pack(query_fmt, group_name.encode(), filename.encode())
  434. try:
  435. tcp_send_data(conn, send_buffer)
  436. th.recv_header(conn)
  437. if th.status != 0:
  438. raise DataError('Error: %d, %s' % (th.status, os.strerror(th.status)))
  439. recv_buffer, recv_size = tcp_recv_response(conn, th.pkg_len)
  440. if recv_size != TRACKER_QUERY_STORAGE_FETCH_BODY_LEN:
  441. errmsg = '[-] Error: Tracker response length is invaild, '
  442. errmsg += 'expect: %d, actual: %d' % (th.pkg_len, recv_size)
  443. raise ResponseError(errmsg)
  444. except ConnectionError:
  445. conn.disconnect()
  446. raise
  447. finally:
  448. self.pool.release(conn)
  449. # recv_fmt: |-group_name(16)-ip_addr(16)-port(8)-|
  450. recv_fmt = '!%ds %ds Q' % (FDFS_GROUP_NAME_MAX_LEN, IP_ADDRESS_SIZE - 1)
  451. store_serv = Storage_server()
  452. (group_name, ipaddr, store_serv.port) = struct.unpack(recv_fmt, recv_buffer)
  453. store_serv.group_name = group_name.strip(b'\x00').decode()
  454. store_serv.ip_addr = ipaddr.strip(b'\x00').decode()
  455. return store_serv
  456. def tracker_query_storage_update(self, group_name, filename):
  457. """
  458. Query storage server to update(delete and set_meta).
  459. """
  460. return self._tracker_do_query_storage(group_name, filename, TRACKER_PROTO_CMD_SERVICE_QUERY_UPDATE)
  461. def tracker_query_storage_fetch(self, group_name, filename):
  462. """
  463. Query storage server to download.
  464. """
  465. return self._tracker_do_query_storage(group_name, filename, TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ONE)