tracker_client.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # filename: tracker_client.py
  4. import struct
  5. import socket
  6. from datetime import datetime
  7. from fdfs_client.fdfs_protol import *
  8. from fdfs_client.connection import *
  9. from fdfs_client.exceptions import (
  10. FDFSError,
  11. ConnectionError,
  12. ResponseError,
  13. InvaildResponse,
  14. DataError
  15. )
  16. from fdfs_client.utils import *
  17. def parse_storage_status(status_code):
  18. try:
  19. ret = {
  20. FDFS_STORAGE_STATUS_INIT : lambda : 'INIT',
  21. FDFS_STORAGE_STATUS_WAIT_SYNC : lambda : 'WAIT_SYNC',
  22. FDFS_STORAGE_STATUS_SYNCING : lambda : 'SYNCING',
  23. FDFS_STORAGE_STATUS_IP_CHANGED : lambda : 'IP_CHANGED',
  24. FDFS_STORAGE_STATUS_DELETED : lambda : 'DELETED',
  25. FDFS_STORAGE_STATUS_OFFLINE : lambda : 'OFFLINE',
  26. FDFS_STORAGE_STATUS_ONLINE : lambda : 'ONLINE',
  27. FDFS_STORAGE_STATUS_ACTIVE : lambda : 'ACTIVE',
  28. FDFS_STORAGE_STATUS_RECOVERY : lambda : 'RECOVERY'
  29. }[status_code]()
  30. except KeyError:
  31. ret = 'UNKNOW'
  32. return ret
  33. class Storage_info(object):
  34. def __init__(self):
  35. self.status = 0
  36. self.id = ''
  37. self.ip_addr = ''
  38. self.domain_name = ''
  39. self.src_ip_addr = ''
  40. self.version = ''
  41. self.totalMB = ''
  42. self.freeMB = ''
  43. self.upload_prio = 0
  44. self.join_time = datetime.fromtimestamp(0).isoformat()
  45. self.up_time = datetime.fromtimestamp(0).isoformat()
  46. self.store_path_count = 0
  47. self.subdir_count_per_path = 0
  48. self.storage_port = 23000
  49. self.storage_http_port = 80
  50. self.curr_write_path = 0
  51. self.total_upload_count = 0
  52. self.success_upload_count = 0
  53. self.total_append_count = 0
  54. self.success_append_count = 0
  55. self.total_modify_count = 0
  56. self.success_modify_count = 0
  57. self.total_truncate_count = 0
  58. self.success_truncate_count = 0
  59. self.total_setmeta_count = 0
  60. self.success_setmeta_count = 0
  61. self.total_del_count = 0
  62. self.success_del_count = 0
  63. self.total_download_count = 0
  64. self.success_download_count = 0
  65. self.total_getmeta_count = 0
  66. self.success_getmeta_count = 0
  67. self.total_create_link_count = 0
  68. self.success_create_link_count = 0
  69. self.total_del_link_count = 0
  70. self.success_del_link_count = 0
  71. self.total_upload_bytes = 0
  72. self.success_upload_bytes = 0
  73. self.total_append_bytes = 0
  74. self.success_append_bytes = 0
  75. self.total_modify_bytes = 0
  76. self.success_modify_bytes = 0
  77. self.total_download_bytes = 0
  78. self.success_download_bytes = 0
  79. self.total_sync_in_bytes = 0
  80. self.success_sync_in_bytes = 0
  81. self.total_sync_out_bytes = 0
  82. self.success_sync_out_bytes = 0
  83. self.total_file_open_count = 0
  84. self.success_file_open_count = 0
  85. self.total_file_read_count = 0
  86. self.success_file_read_count = 0
  87. self.total_file_write_count = 0
  88. self.success_file_write_count = 0
  89. self.last_source_sync = datetime.fromtimestamp(0).isoformat()
  90. self.last_sync_update = datetime.fromtimestamp(0).isoformat()
  91. self.last_synced_time = datetime.fromtimestamp(0).isoformat()
  92. self.last_heartbeat_time = datetime.fromtimestamp(0).isoformat()
  93. self.if_trunk_server = 0
  94. #fmt = |-status(1)-ipaddr(16)-domain(128)-srcipaddr(16)-ver(6)-52*8-|
  95. self.fmt = '!B %ds %ds %ds %ds %ds 52QB' % (FDFS_STORAGE_ID_MAX_SIZE, \
  96. IP_ADDRESS_SIZE, \
  97. FDFS_DOMAIN_NAME_MAX_LEN, \
  98. IP_ADDRESS_SIZE, \
  99. FDFS_VERSION_SIZE)
  100. def set_info(self, bytes_stream):
  101. (self.status, id, ip_addr, domain_name, \
  102. src_ip_addr, version, join_time,up_time, \
  103. totalMB, freeMB, self.upload_prio, \
  104. self.store_path_count, self.subdir_count_per_path, \
  105. self.storage_port, self.storage_http_port, \
  106. self.curr_write_path, \
  107. self.total_upload_count, self.success_upload_count, \
  108. self.total_append_count, self.success_append_count, \
  109. self.total_modify_count, self.success_modify_count, \
  110. self.total_truncate_count,self.success_truncate_count, \
  111. self.total_setmeta_count, self.success_setmeta_count, \
  112. self.total_del_count, self.success_del_count, \
  113. self.total_download_count,self.success_download_count, \
  114. self.total_getmeta_count, self.success_getmeta_count, \
  115. self.total_create_link_count, self.success_create_link_count, \
  116. self.total_del_link_count, self.success_del_link_count, \
  117. self.total_upload_bytes, self.success_upload_bytes, \
  118. self.total_append_bytes, self.total_append_bytes, \
  119. self.total_modify_bytes, self.success_modify_bytes, \
  120. self.total_download_bytes, self.success_download_bytes, \
  121. self.total_sync_in_bytes, self.success_sync_in_bytes, \
  122. self.total_sync_out_bytes, self.success_sync_out_bytes, \
  123. self.total_file_open_count, self.success_file_open_count, \
  124. self.total_file_read_count, self.success_file_read_count, \
  125. self.total_file_write_count, self.success_file_write_count, \
  126. last_source_sync, last_sync_update, last_synced_time, \
  127. last_heartbeat_time, self.if_trunk_server) \
  128. = struct.unpack(self.fmt, bytes_stream)
  129. try:
  130. self.id = id.strip('\x00')
  131. self.ip_addr = ip_addr.strip('\x00')
  132. self.domain_name = domain_name.strip('\x00')
  133. self.version = version.strip('\x00')
  134. self.src_ip_addr = src_ip_addr.strip('\x00')
  135. self.totalMB = appromix(totalMB, FDFS_SPACE_SIZE_BASE_INDEX)
  136. self.freeMB = appromix(freeMB, FDFS_SPACE_SIZE_BASE_INDEX)
  137. except ValueError, e:
  138. raise ResponseError('[-] Error: disk space overrun, can not represented it.')
  139. self.join_time = datetime.fromtimestamp(join_time).isoformat()
  140. self.up_time = datetime.fromtimestamp(up_time).isoformat()
  141. self.last_source_sync = datetime.fromtimestamp(last_source_sync).isoformat()
  142. self.last_sync_update = datetime.fromtimestamp(last_sync_update).isoformat()
  143. self.last_synced_time = datetime.fromtimestamp(last_synced_time).isoformat()
  144. self.last_heartbeat_time = \
  145. datetime.fromtimestamp(last_heartbeat_time).isoformat()
  146. return True
  147. def __str__(self):
  148. '''Transform to readable string.'''
  149. s = 'Storage information:\n'
  150. s += '\tid = %s\n' % (self.id)
  151. s += '\tip_addr = %s (%s)\n' % (self.ip_addr, parse_storage_status(self.status))
  152. s += '\thttp domain = %s\n' % self.domain_name
  153. s += '\tversion = %s\n' % self.version
  154. s += '\tjoin time = %s\n' % self.join_time
  155. s += '\tup time = %s\n' % self.up_time
  156. s += '\ttotal storage = %s\n' % self.totalMB
  157. s += '\tfree storage = %s\n' % self.freeMB
  158. s += '\tupload priority = %d\n' % self.upload_prio
  159. s += '\tstore path count = %d\n' % self.store_path_count
  160. s += '\tsubdir count per path = %d\n' % self.subdir_count_per_path
  161. s += '\tstorage port = %d\n' % self.storage_port
  162. s += '\tstorage HTTP port = %d\n' % self.storage_http_port
  163. s += '\tcurrent write path = %d\n' % self.curr_write_path
  164. s += '\tsource ip_addr = %s\n' % self.src_ip_addr
  165. s += '\tif_trunk_server = %d\n' % self.if_trunk_server
  166. s += '\ttotal upload count = %ld\n' % self.total_upload_count
  167. s += '\tsuccess upload count = %ld\n' % self.success_upload_count
  168. s += '\ttotal download count = %ld\n' % self.total_download_count
  169. s += '\tsuccess download count = %ld\n' % self.success_download_count
  170. s += '\ttotal append count = %ld\n' % self.total_append_count
  171. s += '\tsuccess append count = %ld\n' % self.success_append_count
  172. s += '\ttotal modify count = %ld\n' % self.total_modify_count
  173. s += '\tsuccess modify count = %ld\n' % self.success_modify_count
  174. s += '\ttotal truncate count = %ld\n' % self.total_truncate_count
  175. s += '\tsuccess truncate count = %ld\n' % self.success_truncate_count
  176. s += '\ttotal delete count = %ld\n' % self.total_del_count
  177. s += '\tsuccess delete count = %ld\n' % self.success_del_count
  178. s += '\ttotal set_meta count = %ld\n' % self.total_setmeta_count
  179. s += '\tsuccess set_meta count = %ld\n' % self.success_setmeta_count
  180. s += '\ttotal get_meta count = %ld\n' % self.total_getmeta_count
  181. s += '\tsuccess get_meta count = %ld\n' % self.success_getmeta_count
  182. s += '\ttotal create link count = %ld\n' % self.total_create_link_count
  183. s += '\tsuccess create link count = %ld\n' % self.success_create_link_count
  184. s += '\ttotal delete link count = %ld\n' % self.total_del_link_count
  185. s += '\tsuccess delete link count = %ld\n' % self.success_del_link_count
  186. s += '\ttotal upload bytes = %ld\n' % self.total_upload_bytes
  187. s += '\tsuccess upload bytes = %ld\n' % self.success_upload_bytes
  188. s += '\ttotal download bytes = %ld\n' % self.total_download_bytes
  189. s += '\tsuccess download bytes = %ld\n' % self.success_download_bytes
  190. s += '\ttotal append bytes = %ld\n' % self.total_append_bytes
  191. s += '\tsuccess append bytes = %ld\n' % self.success_append_bytes
  192. s += '\ttotal modify bytes = %ld\n' % self.total_modify_bytes
  193. s += '\tsuccess modify bytes = %ld\n' % self.success_modify_bytes
  194. s += '\ttotal sync_in bytes = %ld\n' % self.total_sync_in_bytes
  195. s += '\tsuccess sync_in bytes = %ld\n' % self.success_sync_in_bytes
  196. s += '\ttotal sync_out bytes = %ld\n' % self.total_sync_out_bytes
  197. s += '\tsuccess sync_out bytes = %ld\n' % self.success_sync_out_bytes
  198. s += '\ttotal file open count = %ld\n' % self.total_file_open_count
  199. s += '\tsuccess file open count = %ld\n' % self.success_file_open_count
  200. s += '\ttotal file read count = %ld\n' % self.total_file_read_count
  201. s += '\tsuccess file read count = %ld\n' % self.success_file_read_count
  202. s += '\ttotal file write count = %ld\n' % self.total_file_write_count
  203. s += '\tsucess file write count = %ld\n' % self.success_file_write_count
  204. s += '\tlast heartbeat time = %s\n' % self.last_heartbeat_time
  205. s += '\tlast source update = %s\n' % self.last_source_sync
  206. s += '\tlast sync update = %s\n' % self.last_sync_update
  207. s += '\tlast synced time = %s\n' % self.last_synced_time
  208. return s
  209. def get_fmt_size(self):
  210. return struct.calcsize(self.fmt)
  211. class Group_info(object):
  212. def __init__(self):
  213. self.group_name = ''
  214. self.totalMB = ''
  215. self.freeMB = ''
  216. self.trunk_freeMB = ''
  217. self.count = 0
  218. self.storage_port = 0
  219. self.store_http_port = 0
  220. self.active_count = 0
  221. self.curr_write_server = 0
  222. self.store_path_count = 0
  223. self.subdir_count_per_path = 0
  224. self.curr_trunk_file_id = 0
  225. self.fmt = '!%ds 11Q' % (FDFS_GROUP_NAME_MAX_LEN + 1)
  226. return None
  227. def __str__(self):
  228. s = 'Group information:\n'
  229. s += '\tgroup name = %s\n' % self.group_name
  230. s += '\tdisk total space = %s\n' % self.totalMB
  231. s += '\tdisk free space = %s\n' % self.freeMB
  232. s += '\ttrunk free space = %s\n' % self.trunk_freeMB
  233. s += '\tstorage server count = %d\n' % self.count
  234. s += '\tstorage port = %d\n' % self.storage_port
  235. s += '\tstorage HTTP port = %d\n' % self.store_http_port
  236. s += '\tactive server count = %d\n' % self.active_count
  237. s += '\tcurrent write server index = %d\n' % self.curr_write_server
  238. s += '\tstore path count = %d\n' % self.store_path_count
  239. s += '\tsubdir count per path = %d\n' % self.subdir_count_per_path
  240. s += '\tcurrent trunk file id = %d\n' % self.curr_trunk_file_id
  241. return s
  242. def set_info(self, bytes_stream):
  243. (group_name, totalMB, freeMB, trunk_freeMB, self.count, self.storage_port, \
  244. self.store_http_port, self.active_count, self.curr_write_server, \
  245. self.store_path_count, self.subdir_count_per_path, self.curr_trunk_file_id) \
  246. = struct.unpack(self.fmt, bytes_stream)
  247. try:
  248. self.group_name = group_name.strip('\x00')
  249. self.totalMB = appromix(totalMB, FDFS_SPACE_SIZE_BASE_INDEX)
  250. self.freeMB = appromix(freeMB, FDFS_SPACE_SIZE_BASE_INDEX)
  251. self.trunk_freeMB = appromix(trunk_freeMB, FDFS_SPACE_SIZE_BASE_INDEX)
  252. except ValueError:
  253. raise DataError('[-] Error disk space overrun, can not represented it.')
  254. def get_fmt_size(self):
  255. return struct.calcsize(self.fmt)
  256. class Tracker_client(object):
  257. '''Class Tracker client.'''
  258. def __init__(self, pool):
  259. self.pool = pool
  260. def tracker_list_servers(self, group_name, storage_ip = None):
  261. '''
  262. List servers in a storage group
  263. '''
  264. conn = self.pool.get_connection()
  265. th = Tracker_header()
  266. ip_len = len(storage_ip) if storage_ip else 0
  267. if ip_len >= IP_ADDRESS_SIZE:
  268. ip_len = IP_ADDRESS_SIZE - 1
  269. th.pkg_len = FDFS_GROUP_NAME_MAX_LEN + ip_len
  270. th.cmd = TRACKER_PROTO_CMD_SERVER_LIST_STORAGE
  271. group_fmt = '!%ds' % FDFS_GROUP_NAME_MAX_LEN
  272. store_ip_addr = storage_ip or ''
  273. storage_ip_fmt = '!%ds' % ip_len
  274. try:
  275. th.send_header(conn)
  276. send_buffer = struct.pack(group_fmt, group_name) + \
  277. struct.pack(storage_ip_fmt, store_ip_addr)
  278. tcp_send_data(conn, send_buffer)
  279. th.recv_header(conn)
  280. if th.status != 0:
  281. raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
  282. recv_buffer, recv_size = tcp_recv_response(conn, th.pkg_len)
  283. si = Storage_info()
  284. si_fmt_size = si.get_fmt_size()
  285. recv_size = len(recv_buffer)
  286. if recv_size % si_fmt_size != 0:
  287. errinfo = '[-] Error: response size not match, expect: %d, actual: %d' \
  288. % (th.pkg_len, recv_size)
  289. raise ResponseError(errinfo)
  290. except ConnectionError:
  291. conn.disconnect()
  292. raise
  293. finally:
  294. self.pool.release(conn)
  295. num_storage = recv_size / si_fmt_size
  296. si_list = []
  297. i = 0
  298. while num_storage:
  299. si.set_info(recv_buffer[(i * si_fmt_size) : ((i + 1) * si_fmt_size)])
  300. si_list.append(si)
  301. si = Storage_info()
  302. num_storage -= 1
  303. i += 1
  304. ret_dict = {}
  305. ret_dict['Group name'] = group_name
  306. ret_dict['Servers'] = si_list
  307. return ret_dict
  308. def tracker_list_one_group(self, group_name):
  309. conn = self.pool.get_connection()
  310. th = Tracker_header()
  311. th.pkg_len = FDFS_GROUP_NAME_MAX_LEN
  312. th.cmd = TRACKER_PROTO_CMD_SERVER_LIST_ONE_GROUP
  313. #group_fmt: |-group_name(16)-|
  314. group_fmt = '!%ds' % FDFS_GROUP_NAME_MAX_LEN
  315. try:
  316. th.send_header(conn)
  317. send_buffer = struct.pack(group_fmt, group_name)
  318. tcp_send_data(conn, send_buffer)
  319. th.recv_header(conn)
  320. if th.status != 0:
  321. raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
  322. recv_buffer, recv_size = tcp_recv_response(conn, th.pkg_len)
  323. group_info = Group_info()
  324. group_info.set_info(recv_buffer)
  325. except ConnectionError:
  326. conn.disconnect()
  327. raise
  328. finally:
  329. self.pool.release(conn)
  330. return group_info
  331. def tracker_list_all_groups(self):
  332. conn = self.pool.get_connection()
  333. th = Tracker_header()
  334. th.cmd = TRACKER_PROTO_CMD_SERVER_LIST_ALL_GROUPS
  335. try:
  336. th.send_header(conn)
  337. th.recv_header(conn)
  338. if th.status != 0:
  339. raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
  340. recv_buffer, recv_size = tcp_recv_response(conn, th.pkg_len)
  341. except ConnectionError:
  342. conn.disconnect()
  343. raise
  344. finally:
  345. self.pool.release(conn)
  346. gi = Group_info()
  347. gi_fmt_size = gi.get_fmt_size()
  348. if recv_size % gi_fmt_size != 0:
  349. errmsg = '[-] Error: Response size is mismatch, except: %d, actul: %d' \
  350. % (th.pkg_len, recv_size)
  351. raise ResponseError(errmsg)
  352. num_groups = recv_size / gi_fmt_size
  353. ret_dict = {}
  354. ret_dict['Groups count'] = num_groups
  355. gi_list = []
  356. i = 0
  357. while num_groups:
  358. gi.set_info(recv_buffer[i * gi_fmt_size : (i + 1) * gi_fmt_size])
  359. gi_list.append(gi)
  360. gi = Group_info()
  361. i += 1
  362. num_groups -= 1
  363. ret_dict['Groups'] = gi_list
  364. return ret_dict
  365. def tracker_query_storage_stor_without_group(self):
  366. '''Query storage server for upload, without group name.
  367. Return: Storage_server object'''
  368. conn = self.pool.get_connection()
  369. th = Tracker_header()
  370. th.cmd = TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE
  371. try:
  372. th.send_header(conn)
  373. th.recv_header(conn)
  374. if th.status != 0:
  375. raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
  376. recv_buffer, recv_size = tcp_recv_response(conn, th.pkg_len)
  377. if recv_size != TRACKER_QUERY_STORAGE_STORE_BODY_LEN:
  378. errmsg = '[-] Error: Tracker response length is invaild, '
  379. errmsg += 'expect: %d, actual: %d' \
  380. % (TRACKER_QUERY_STORAGE_STORE_BODY_LEN, recv_size)
  381. raise ResponseError(errmsg)
  382. except ConnectionError:
  383. conn.disconnect()
  384. raise
  385. finally:
  386. self.pool.release(conn)
  387. #recv_fmt |-group_name(16)-ipaddr(16-1)-port(8)-store_path_index(1)|
  388. recv_fmt = '!%ds %ds Q B' % (FDFS_GROUP_NAME_MAX_LEN, IP_ADDRESS_SIZE - 1)
  389. store_serv = Storage_server()
  390. (group_name, ip_addr, \
  391. store_serv.port, store_serv.store_path_index) = struct.unpack(recv_fmt, recv_buffer)
  392. store_serv.group_name = group_name.strip('\x00')
  393. store_serv.ip_addr = ip_addr.strip('\x00')
  394. return store_serv
  395. def tracker_query_storage_stor_with_group(self, group_name):
  396. '''Query storage server for upload, based group name.
  397. arguments:
  398. @group_name: string
  399. @Return Storage_server object
  400. '''
  401. conn = self.pool.get_connection()
  402. th = Tracker_header()
  403. th.cmd = TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITH_GROUP_ONE
  404. th.pkg_len = FDFS_GROUP_NAME_MAX_LEN
  405. th.send_header(conn)
  406. group_fmt = '!%ds' % FDFS_GROUP_NAME_MAX_LEN
  407. send_buffer = struct.pack(group_fmt, group_name)
  408. try:
  409. tcp_send_data(conn, send_buffer)
  410. th.recv_header(conn)
  411. if th.status != 0:
  412. raise DataError('Error: %d, %s' % (th.status, os.strerror(th.status)))
  413. recv_buffer, recv_size = tcp_recv_response(conn, th.pkg_len)
  414. if recv_size != TRACKER_QUERY_STORAGE_STORE_BODY_LEN:
  415. errmsg = '[-] Error: Tracker response length is invaild, '
  416. errmsg += 'expect: %d, actual: %d' \
  417. % (TRACKER_QUERY_STORAGE_STORE_BODY_LEN, recv_size)
  418. raise ResponseError(errmsg)
  419. except ConnectionError:
  420. conn.disconnect()
  421. raise
  422. finally:
  423. self.pool.release(conn)
  424. #recv_fmt: |-group_name(16)-ipaddr(16-1)-port(8)-store_path_index(1)-|
  425. recv_fmt = '!%ds %ds Q B' % (FDFS_GROUP_NAME_MAX_LEN, IP_ADDRESS_SIZE - 1)
  426. store_serv = Storage_server()
  427. (group, ip_addr, \
  428. store_serv.port, store_serv.store_path_index) = struct.unpack(recv_fmt, recv_buffer)
  429. store_serv.group_name = group.strip('\x00')
  430. store_serv.ip_addr = ip_addr.strip('\x00')
  431. return store_serv
  432. def _tracker_do_query_storage(self,group_name, filename, cmd):
  433. '''
  434. core of query storage, based group name and filename.
  435. It is useful download, delete and set meta.
  436. arguments:
  437. @group_name: string
  438. @filename: string. remote file_id
  439. @Return: Storage_server object
  440. '''
  441. conn = self.pool.get_connection()
  442. th = Tracker_header()
  443. file_name_len = len(filename)
  444. th.pkg_len = FDFS_GROUP_NAME_MAX_LEN + file_name_len
  445. th.cmd = cmd
  446. th.send_header(conn)
  447. #query_fmt: |-group_name(16)-filename(file_name_len)-|
  448. query_fmt = '!%ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, file_name_len)
  449. send_buffer = struct.pack(query_fmt, group_name, filename)
  450. try:
  451. tcp_send_data(conn, send_buffer)
  452. th.recv_header(conn)
  453. if th.status != 0:
  454. raise DataError('Error: %d, %s' % (th.status, os.strerror(th.status)))
  455. recv_buffer, recv_size = tcp_recv_response(conn, th.pkg_len)
  456. if recv_size != TRACKER_QUERY_STORAGE_FETCH_BODY_LEN:
  457. errmsg = '[-] Error: Tracker response length is invaild, '
  458. errmsg += 'expect: %d, actual: %d' % (th.pkg_len, recv_size)
  459. raise ResponseError(errmsg)
  460. except ConnectionError:
  461. conn.disconnect()
  462. raise
  463. finally:
  464. self.pool.release(conn)
  465. #recv_fmt: |-group_name(16)-ip_addr(16)-port(8)-|
  466. recv_fmt = '!%ds %ds Q' % (FDFS_GROUP_NAME_MAX_LEN, IP_ADDRESS_SIZE - 1)
  467. store_serv = Storage_server()
  468. (group_name, ipaddr, store_serv.port) = struct.unpack(recv_fmt, recv_buffer)
  469. store_serv.group_name = group_name.strip('\x00')
  470. store_serv.ip_addr = ipaddr.strip('\x00')
  471. return store_serv
  472. def tracker_query_storage_update(self, group_name, filename):
  473. '''
  474. Query storage server to update(delete and set_meta).
  475. '''
  476. return self._tracker_do_query_storage(group_name, filename, \
  477. TRACKER_PROTO_CMD_SERVICE_QUERY_UPDATE)
  478. def tracker_query_storage_fetch(self, group_name, filename):
  479. '''
  480. Query storage server to download.
  481. '''
  482. return self._tracker_do_query_storage(group_name, filename, \
  483. TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ONE)