博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
FastDFS源代码分析之tracker协议分析
阅读量:6337 次
发布时间:2019-06-22

本文共 14578 字,大约阅读时间需要 48 分钟。

本篇博客主要解说fastdfs中tracker协议的解说。

fastdfs主要是存储文件。直接把整个文件存储到磁盘上,所以。简单直接。可是也有非常大的局限性。

因此,fastdfs对文件的文件夹设置和存储是最为核心的。

为什么这么突然的解说这些。由于我已经看了一段时间的fastdfs,主要结构都已经搞的比較清晰了。因此,这篇文章。我就主要一tracker这一部分的协议来分析。

其它详细介绍tracker的请百度。我就不介绍了,我就直接从

int tracker_deal_task(struct fast_task_info *pTask)
这种方法開始对每一个case分析。

1、storage心跳协议

case TRACKER_PROTO_CMD_STORAGE_BEAT:			TRACKER_CHECK_LOGINED(pTask)			result = tracker_deal_storage_beat(pTask);			break;
自然。该协议是从storage层发送给tracker层的数据包,

#define TRACKER_PROTO_CMD_STORAGE_BEAT              83  //storage heart beat
那么,storage主要是做了什么:

storage在启动的时候,会开启一个线程,该线程为

static void *tracker_report_thread_entrance(void *arg)
该函数主要是做了依据配置连接对应的它的组的tacker。做一些事情,这里有个while循环,代码例如以下

current_time = g_current_time;			if (current_time - last_beat_time >= \					g_heart_beat_interval)			{				if (tracker_heart_beat(pTrackerServer, \					&stat_chg_sync_count, \					&bServerPortChanged) != 0)				{					break;				}

也就是至少30秒钟来一次心跳。心跳包的主要数据是包头和当前storage的状态信息,

char out_buff[sizeof(TrackerHeader) + sizeof(FDFSStorageStatBuff)];
/* struct for network transfering */typedef struct{	char sz_total_upload_count[8];	char sz_success_upload_count[8];	char sz_total_append_count[8];	char sz_success_append_count[8];	char sz_total_modify_count[8];	char sz_success_modify_count[8];	char sz_total_truncate_count[8];	char sz_success_truncate_count[8];	char sz_total_set_meta_count[8];	char sz_success_set_meta_count[8];	char sz_total_delete_count[8];	char sz_success_delete_count[8];	char sz_total_download_count[8];	char sz_success_download_count[8];	char sz_total_get_meta_count[8];	char sz_success_get_meta_count[8];	char sz_total_create_link_count[8];	char sz_success_create_link_count[8];	char sz_total_delete_link_count[8];	char sz_success_delete_link_count[8];	char sz_total_upload_bytes[8];	char sz_success_upload_bytes[8];	char sz_total_append_bytes[8];	char sz_success_append_bytes[8];	char sz_total_modify_bytes[8];	char sz_success_modify_bytes[8];	char sz_total_download_bytes[8];	char sz_success_download_bytes[8];	char sz_total_sync_in_bytes[8];	char sz_success_sync_in_bytes[8];	char sz_total_sync_out_bytes[8];	char sz_success_sync_out_bytes[8];	char sz_total_file_open_count[8];	char sz_success_file_open_count[8];	char sz_total_file_read_count[8];	char sz_success_file_read_count[8];	char sz_total_file_write_count[8];	char sz_success_file_write_count[8];	char sz_last_source_update[8];	char sz_last_sync_update[8];	char sz_last_synced_timestamp[8];	char sz_last_heart_beat_time[8];} FDFSStorageStatBuff;

tracker主要是做了什么呢?

对其进行解包,然后对这个保存在本地的storage的信息进行保存到文件里,调用

status = tracker_save_storages();

调用

tracker_mem_active_store_server(pClientInfo->pGroup, \				pClientInfo->pStorage);
将这个存储服务器假设没有,就插入到group中。

最后调用

static int tracker_check_and_sync(struct fast_task_info *pTask, \			const int status)
检查对应的改变状态。并将其同步等。

(须要再具体看看)

2、报告对应同步时间

#define TRACKER_PROTO_CMD_STORAGE_SYNC_REPORT	    89  //report src last synced time as dest server

相同在storage的report线程运行

if (sync_time_chg_count != g_sync_change_count && \				current_time - last_sync_report_time >= \					g_heart_beat_interval)			{				if (tracker_report_sync_timestamp( \					pTrackerServer, &bServerPortChanged)!=0)				{					break;				}				sync_time_chg_count = g_sync_change_count;				last_sync_report_time = current_time;			}
详细的数据包为

pEnd = g_storage_servers + g_storage_count;	for (pServer=g_storage_servers; pServer
server.id, FDFS_STORAGE_ID_MAX_SIZE); p += FDFS_STORAGE_ID_MAX_SIZE; int2buff(pServer->last_sync_src_timestamp, p); p += 4; }
也就是遍历当前进程的本组全部storageserver,和上次同步的时间戳。给trackerserver。

然后tracker的server存储结构为

pClientInfo->pGroup->last_sync_timestamps \				[src_index][dest_index] = sync_timestamp;

dest_index 值为当前连接所在组的索引值

dest_index = tracker_mem_get_storage_index(pClientInfo->pGroup,			pClientInfo->pStorage);	if (dest_index < 0 || dest_index >= pClientInfo->pGroup->count)	{		status = 0;		break;	}
由于 本链接的storage是固定不变的,而src_index就是为本组的其它storage的id索引,

首相通过id。(ip地址)找到详细的storage。然后在通过指针找到索引位置,

最后。调用

if (++g_storage_sync_time_chg_count % \			TRACKER_SYNC_TO_FILE_FREQ == 0)	{		status = tracker_save_sync_timestamps();	}	else	{		status = 0;	}	} while (0);	return tracker_check_and_sync(pTask, status);
定时保存文件和检查等

3、上报磁盘情况

#define TRACKER_PROTO_CMD_STORAGE_REPORT_DISK_USAGE 84  //report disk usage

相同线程定时调用。

if (current_time - last_df_report_time >= \					g_stat_report_interval)			{				if (tracker_report_df_stat(pTrackerServer, \						&bServerPortChanged) != 0)				{					break;				}				last_df_report_time = current_time;			}

相同上报这些数据

for (i=0; i
sz_total_mb); long2buff(g_path_space_list[i].free_mb, pStatBuff->sz_free_mb); pStatBuff++; }
tracker这边存储在

int64_t *path_total_mbs; //total disk storage in MB

int64_t *path_free_mbs;  //free disk storage in MB

这里

path_total_mbs[i] = buff2long(pStatBuff->sz_total_mb);		path_free_mbs[i] = buff2long(pStatBuff->sz_free_mb);		pClientInfo->pStorage->total_mb += path_total_mbs[i];		pClientInfo->pStorage->free_mb += path_free_mbs[i];

4、storage服增加到tracker

#define TRACKER_PROTO_CMD_STORAGE_JOIN              81

storage线程相同在该处调用

if (tracker_report_join(pTrackerServer, tracker_index, \					sync_old_done) != 0)		{			sleep(g_heart_beat_interval);			continue;		}
发送的包体数据包为:

typedef struct{	char group_name[FDFS_GROUP_NAME_MAX_LEN+1];	char storage_port[FDFS_PROTO_PKG_LEN_SIZE];	char storage_http_port[FDFS_PROTO_PKG_LEN_SIZE];	char store_path_count[FDFS_PROTO_PKG_LEN_SIZE];	char subdir_count_per_path[FDFS_PROTO_PKG_LEN_SIZE];	char upload_priority[FDFS_PROTO_PKG_LEN_SIZE];	char join_time[FDFS_PROTO_PKG_LEN_SIZE]; //storage join timestamp	char up_time[FDFS_PROTO_PKG_LEN_SIZE];   //storage service started timestamp	char version[FDFS_VERSION_SIZE];   //storage version	char domain_name[FDFS_DOMAIN_NAME_MAX_SIZE];	char init_flag;	signed char status;	char tracker_count[FDFS_PROTO_PKG_LEN_SIZE];  //all tracker server count} TrackerStorageJoinBody;
当赋值完毕后。在气候变增加

p = out_buff + sizeof(TrackerHeader) + sizeof(TrackerStorageJoinBody);	pServerEnd = g_tracker_group.servers + g_tracker_group.server_count;	for (pServer=g_tracker_group.servers; pServer
ip_addr, pTrackerServer->ip_addr) == 0 && \ pServer->port == pTrackerServer->port) { continue; } tracker_count++; */ sprintf(p, "%s:%d", pServer->ip_addr, pServer->port); p += FDFS_PROTO_IP_PORT_SIZE; }
增加全部tracker的server信息格式为ip:port

tracker server接收

case TRACKER_PROTO_CMD_STORAGE_JOIN:			result = tracker_deal_storage_join(pTask);			break;

获取到的相关信息存储到

typedef struct{	int storage_port;	int storage_http_port;	int store_path_count;	int subdir_count_per_path;	int upload_priority;	int join_time; //storage join timestamp (create timestamp)	int up_time;   //storage service started timestamp        char version[FDFS_VERSION_SIZE];   //storage version	char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];        char domain_name[FDFS_DOMAIN_NAME_MAX_SIZE];        char init_flag;	signed char status;	int tracker_count;	ConnectionInfo tracker_servers[FDFS_MAX_TRACKERS];} FDFSStorageJoinBody;
这些结构体内

同一时候插入本地内存

result = tracker_mem_add_group_and_storage(pClientInfo, \

pTask->client_ip, &joinBody, true);

同一时候把发消息报的id传过来

pJoinBodyResp = (TrackerStorageJoinBodyResp *)(pTask->data + \				sizeof(TrackerHeader));	memset(pJoinBodyResp, 0, sizeof(TrackerStorageJoinBodyResp));	if (pClientInfo->pStorage->psync_src_server != NULL)	{		strcpy(pJoinBodyResp->src_id, \			pClientInfo->pStorage->psync_src_server->id);	}

5、报告存储状态

#define TRACKER_PROTO_CMD_STORAGE_REPORT_STATUS     76  //report specified storage server status
storageserver调用

int tracker_report_storage_status(ConnectionInfo *pTrackerServer, \		FDFSStorageBrief *briefServer)
内容主要是组名字

strcpy(out_buff + sizeof(TrackerHeader), g_group_name);
和简要信息

memcpy(out_buff + sizeof(TrackerHeader) + FDFS_GROUP_NAME_MAX_LEN, \			briefServer, sizeof(FDFSStorageBrief));
其结构体例如以下

typedef struct{	char status;	char port[4];	char id[FDFS_STORAGE_ID_MAX_SIZE];	char ip_addr[IP_ADDRESS_SIZE];} FDFSStorageBrief;

6、从tracker获取storage状态。

#define TRACKER_PROTO_CMD_STORAGE_GET_STATUS	    71  //get storage status from tracker
该协议是由client发起

调用流程例如以下:

int tracker_get_storage_status(ConnectionInfo *pTrackerServer, \		const char *group_name, const char *ip_addr, \		FDFSStorageBrief *pDestBuff)int tracker_get_storage_max_status(TrackerServerGroup *pTrackerGroup, \		const char *group_name, const char *ip_addr, \		char *storage_id, int *status)	int tracker_get_storage_status(ConnectionInfo *pTrackerServer, \		const char *group_name, const char *ip_addr, \		FDFSStorageBrief *pDestBuff)

获取自己的状态,

包体格式   组名 ip的字符串

tracker通过获取了对应的数据。查找到storage的信息

结构体为:

typedef struct{	char status;	char port[4];	char id[FDFS_STORAGE_ID_MAX_SIZE];	char ip_addr[IP_ADDRESS_SIZE];} FDFSStorageBrief;

赋值后。返回

7、通过tracker获取storageid

#define TRACKER_PROTO_CMD_STORAGE_GET_SERVER_ID    70  //get storage server id from tracker

和上以协议请求一样 groupname+ip 组成。

tracker处理方法

static int tracker_deal_get_storage_id(struct fast_task_info *pTask)

tracker最后通过

FDFSStorageIdInfo *fdfs_get_storage_id_by_ip(const char *group_name, \		const char *pIpAddr){	FDFSStorageIdInfo target;	memset(&target, 0, sizeof(FDFSStorageIdInfo));	snprintf(target.group_name, sizeof(target.group_name), "%s", group_name);	snprintf(target.ip_addr, sizeof(target.ip_addr), "%s", pIpAddr);	return (FDFSStorageIdInfo *)bsearch(&target, g_storage_ids_by_ip, \		g_storage_id_count, sizeof(FDFSStorageIdInfo), \		fdfs_cmp_group_name_and_ip);}
该方法获取了了
FDFSStorageIdInfo

信息。然后赋值,返回。

8、通过tracker获取全部storageserver

#define TRACKER_PROTO_CMD_STORAGE_FETCH_STORAGE_IDS 69  //get all storage ids from tracker
for (i=0; i<5; i++)	{		for (pGServer=pServerStart; pGServer
sock = -1; result = fdfs_get_storage_ids_from_tracker_server(pTServer); if (result == 0) { return result; } } if (pServerStart != pTrackerGroup->servers) { pServerStart = pTrackerGroup->servers; } sleep(1); }

调用顺序

int storage_func_init(const char *filename, char *bind_addr, const int addr_size)int fdfs_get_storage_ids_from_tracker_group(TrackerServerGroup *pTrackerGroup)int fdfs_get_storage_ids_from_tracker_server(ConnectionInfo *pTrackerServer)

tracker函数。每秒钟中调用。遍历全部的trackersserver

trackerserver获取

case TRACKER_PROTO_CMD_STORAGE_FETCH_STORAGE_IDS:			result = tracker_deal_fetch_storage_ids(pTask);			break;
然后通过这样的协议格式

返回的数据

pIdsStart = g_storage_ids_by_ip + start_index;	pIdsEnd = g_storage_ids_by_ip + g_storage_id_count;	for (pIdInfo = pIdsStart; pIdInfo < pIdsEnd; pIdInfo++)	{		if ((int)(p - pTask->data) > pTask->size - 64)		{			break;		}		p += sprintf(p, "%s %s %s\n", pIdInfo->id, \			pIdInfo->group_name, pIdInfo->ip_addr);	}

 返回给请求者。

9、回复给新的storage

#define TRACKER_PROTO_CMD_STORAGE_REPLICA_CHG       85  //repl new storage servers

storageserver调用流程:

剩下的协议

static int tracker_merge_servers(ConnectionInfo *pTrackerServer, \		FDFSStorageBrief *briefServers, const int server_count)

case TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ONE:			result = tracker_deal_service_query_fetch_update( \					pTask, pHeader->cmd);			break;		case TRACKER_PROTO_CMD_SERVICE_QUERY_UPDATE:			result = tracker_deal_service_query_fetch_update( \					pTask, pHeader->cmd);			break;		case TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ALL:			result = tracker_deal_service_query_fetch_update( \					pTask, pHeader->cmd);			break;		case TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE:			result = tracker_deal_service_query_storage( \					pTask, pHeader->cmd);			break;		case TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITH_GROUP_ONE:			result = tracker_deal_service_query_storage( \					pTask, pHeader->cmd);			break;		case TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ALL:			result = tracker_deal_service_query_storage( \					pTask, pHeader->cmd);			break;		case TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITH_GROUP_ALL:			result = tracker_deal_service_query_storage( \					pTask, pHeader->cmd);			break;		case TRACKER_PROTO_CMD_SERVER_LIST_ONE_GROUP:			result = tracker_deal_server_list_one_group(pTask);			break;		case TRACKER_PROTO_CMD_SERVER_LIST_ALL_GROUPS:			result = tracker_deal_server_list_all_groups(pTask);			break;		case TRACKER_PROTO_CMD_SERVER_LIST_STORAGE:			result = tracker_deal_server_list_group_storages(pTask);			break;		case TRACKER_PROTO_CMD_STORAGE_SYNC_SRC_REQ:			result = tracker_deal_storage_sync_src_req(pTask);			break;		case TRACKER_PROTO_CMD_STORAGE_SYNC_DEST_REQ:			TRACKER_CHECK_LOGINED(pTask)			result = tracker_deal_storage_sync_dest_req(pTask);			break;		case TRACKER_PROTO_CMD_STORAGE_SYNC_NOTIFY:			result = tracker_deal_storage_sync_notify(pTask);			break;		case TRACKER_PROTO_CMD_STORAGE_SYNC_DEST_QUERY:			result = tracker_deal_storage_sync_dest_query(pTask);			break;		case TRACKER_PROTO_CMD_SERVER_DELETE_STORAGE:			result = tracker_deal_server_delete_storage(pTask);			break;		case TRACKER_PROTO_CMD_SERVER_SET_TRUNK_SERVER:			result = tracker_deal_server_set_trunk_server(pTask);			break;		case TRACKER_PROTO_CMD_STORAGE_REPORT_IP_CHANGED:			result = tracker_deal_storage_report_ip_changed(pTask);			break;		case TRACKER_PROTO_CMD_STORAGE_CHANGELOG_REQ:			result = tracker_deal_changelog_req(pTask);			break;		case TRACKER_PROTO_CMD_STORAGE_PARAMETER_REQ:			result = tracker_deal_parameter_req(pTask);			break;		case FDFS_PROTO_CMD_QUIT:			close(pTask->ev_read.ev_fd);			task_finish_clean_up(pTask);			return 0;		case FDFS_PROTO_CMD_ACTIVE_TEST:			result = tracker_deal_active_test(pTask);			break;		case TRACKER_PROTO_CMD_TRACKER_GET_STATUS:			result = tracker_deal_get_tracker_status(pTask);			break;		case TRACKER_PROTO_CMD_TRACKER_GET_SYS_FILES_START:			result = tracker_deal_get_sys_files_start(pTask);			break;		case TRACKER_PROTO_CMD_TRACKER_GET_ONE_SYS_FILE:			result = tracker_deal_get_one_sys_file(pTask);			break;		case TRACKER_PROTO_CMD_TRACKER_GET_SYS_FILES_END:			result = tracker_deal_get_sys_files_end(pTask);			break;		case TRACKER_PROTO_CMD_STORAGE_REPORT_TRUNK_FID:			TRACKER_CHECK_LOGINED(pTask)			result = tracker_deal_report_trunk_fid(pTask);			break;		case TRACKER_PROTO_CMD_STORAGE_FETCH_TRUNK_FID:			TRACKER_CHECK_LOGINED(pTask)			result = tracker_deal_get_trunk_fid(pTask);			break;		case TRACKER_PROTO_CMD_STORAGE_REPORT_TRUNK_FREE:			TRACKER_CHECK_LOGINED(pTask)			result = tracker_deal_report_trunk_free_space(pTask);			break;		case TRACKER_PROTO_CMD_TRACKER_PING_LEADER:			result = tracker_deal_ping_leader(pTask);			break;		case TRACKER_PROTO_CMD_TRACKER_NOTIFY_NEXT_LEADER:			result = tracker_deal_notify_next_leader(pTask);			break;		case TRACKER_PROTO_CMD_TRACKER_COMMIT_NEXT_LEADER:			result = tracker_deal_commit_next_leader(pTask);			break;

转载地址:http://iqxoa.baihongyu.com/

你可能感兴趣的文章
音频视频组件Audio DJ Studio for .NET更新至v10.0.0.0丨附下载
查看>>
线上问题处理(一)
查看>>
在Centos7 系统中使用supermin制作CentOS6 的Docker镜像
查看>>
select2和bootstrap模态框使用时出现的bug以及解决方案
查看>>
EOS cleos 链接节点命令
查看>>
4.39-Nginx日志不记录静态文件
查看>>
写出企业要求的 Python 代码风格
查看>>
OSChina 周四乱弹 —— 如何正确地请客吃饭
查看>>
OSChina 周三乱弹 ——所有的酒,都不如你
查看>>
Pig的输入输出及foreach,group关系操作
查看>>
TechParty - Code For Public - sz
查看>>
emacs 前端插件推荐[emmet-mode]
查看>>
dnsmasq配置文件
查看>>
Unity链接SqlServer数据库并进行简单的数据查询
查看>>
23种设计模式
查看>>
原生javascript学习:用循环改变div颜色
查看>>
ABBYY FineReader 12内置的自动化任务
查看>>
ab 测试 和 apache 修改 并发数 mpm
查看>>
Nginx 的软件负载均衡详解
查看>>
TIMED OUT WAITING FOR OHASD MONITOR
查看>>