Compare commits

..

No commits in common. "multiprocess" and "master" have entirely different histories.

1 changed files with 93 additions and 124 deletions

View File

@ -25,7 +25,6 @@
#include <sys/time.h>
#include <limits.h>
#include <syslog.h>
#include <sys/wait.h>
#include <netinet/tcp.h>
#include <fcntl.h>
@ -38,6 +37,7 @@
typeof( ((type *)0)->member ) *__mptr = (ptr); \
(type *)( (char *)__mptr - offsetof(type,member) );})
static const char *filename;
static const char *addr;
struct ev_loop *tip_main_loop;
@ -52,7 +52,6 @@ enum {
struct tip_client {
ev_io io;
struct sockaddr_in addr;
const char *filename;
char buf[10240000];
uint32_t buf_len;
uint64_t data_len;
@ -122,12 +121,12 @@ static void tip_client_progress(struct tip_client *cli, bool now)
printf("%3lu%% (%lu Mbytes/second) %s from %s:9999\n",
cli->content_len > 0 ? 100 * cli->data_len / cli->content_len : 0,
tv.tv_sec > 0 ? cli->data_len / 1024000 / tv.tv_sec : cli->data_len / 1024000,
cli->filename, inet_ntoa(cli->addr.sin_addr));
filename, inet_ntoa(cli->addr.sin_addr));
cli->tv_last = tv_cur;
}
}
static int tip_client_connect(struct tip_client *cli, const char *addr);
static int tip_client_connect(const char *addr);
static int tip_client_get_hdr(struct tip_client *cli)
{
@ -142,7 +141,7 @@ static int tip_client_get_hdr(struct tip_client *cli)
return 0;
if (!strncmp(cli->buf, "HTTP/1.1 404 Not Found", strlen("HTTP/1.1 404 Not Found"))) {
syslog(LOG_ERR, "server says file `%s' not found\n", cli->filename);
syslog(LOG_ERR, "server says file `%s' not found\n", filename);
return -1;
}
if (!strncmp(cli->buf, "HTTP/1.1 301 Moves Permanently", strlen("HTTP/1.1 301 Moves Permanently"))) {
@ -162,11 +161,11 @@ static int tip_client_get_hdr(struct tip_client *cli)
ptr[0] = '\0';
syslog(LOG_INFO, "Redirected to %s to fetch file %s\n",
redirect_addr, cli->filename);
redirect_addr, filename);
cli->redirected = true;
tip_client_close(cli);
tip_client_connect(cli, redirect_addr);
tip_client_connect(redirect_addr);
cli->state = TIP_CLIENT_GET_HEADER;
return 0;
@ -203,7 +202,7 @@ static int tip_client_get_hdr(struct tip_client *cli)
ret = write(cli->fd, payload, payload_len);
if (ret < 0) {
syslog(LOG_ERR, "failed to write to file %s: %s",
cli->filename, strerror(errno));
filename, strerror(errno));
return ret;
}
}
@ -220,7 +219,7 @@ static int tip_client_get_payload(struct tip_client *cli)
ret = write(cli->fd, cli->buf, cli->buf_len);
if (ret < 0) {
syslog(LOG_ERR, "failed to write to file %s: %s",
cli->filename, strerror(errno));
filename, strerror(errno));
return ret;
}
@ -231,7 +230,7 @@ static int tip_client_get_payload(struct tip_client *cli)
if (cli->data_len >= cli->content_len) {
if (cli->redirected) {
tip_client_close(cli);
tip_client_connect(cli, addr);
tip_client_connect(addr);
cli->state = TIP_CLIENT_POST_REDIRECT;
return 1;
@ -263,7 +262,7 @@ static int tip_client_head_hdr(struct tip_client *cli)
return 0;
if (!strncmp(cli->buf, "HTTP/1.1 404 Not Found", strlen("HTTP/1.1 404 Not Found"))) {
syslog(LOG_ERR, "server says file `%s' not found\n", cli->filename);
syslog(LOG_ERR, "server says file `%s' not found\n", filename);
return -1;
}
@ -276,21 +275,21 @@ static int tip_client_head_hdr(struct tip_client *cli)
if (cli->content_len < 0)
return -1;
if (cli->content_len == 0) {
syslog(LOG_ERR, "server reports zero size file %s", cli->filename);
syslog(LOG_ERR, "server reports zero size file %s", filename);
return -1;
}
cli->fd = open(cli->filename, O_WRONLY | O_CREAT | O_TRUNC, 0600);
cli->fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC, 0600);
if (cli->fd < 0) {
syslog(LOG_ERR, "failed to open file %s: %s",
cli->filename, strerror(errno));
filename, strerror(errno));
return -1;
}
if (fallocate(cli->fd, 0, 0, cli->content_len) < 0) {
syslog(LOG_ERR, "failed to allocate room for file %s: %s",
cli->filename, strerror(errno));
delete_file(cli->filename);
filename, strerror(errno));
delete_file(filename);
cli->fatal = true;
return -1;
}
@ -403,7 +402,7 @@ static void tip_client_connect_cb(struct ev_loop *loop, struct ev_io *io, int ev
len = sizeof(cli->addr);
ret = connect(cli->io.fd, (struct sockaddr *)&cli->addr, len);
if (ret < 0) {
syslog(LOG_ERR, "failed to connect to server to fetch %s", cli->filename);
syslog(LOG_ERR, "failed to connect to server to fetch %s", filename);
tip_client_error(cli);
return;
}
@ -411,27 +410,27 @@ static void tip_client_connect_cb(struct ev_loop *loop, struct ev_io *io, int ev
switch (cli->state) {
case TIP_CLIENT_GET_HEADER:
syslog(LOG_INFO, "connected to %s to fetch file %s\n",
inet_ntoa(cli->addr.sin_addr), cli->filename);
inet_ntoa(cli->addr.sin_addr), filename);
if (cli->server_only)
snprintf(buf, sizeof(buf), "GET /%s HTTP/1.1\r\nX-Accept-Redirect: off\r\n\r\n", cli->filename);
snprintf(buf, sizeof(buf), "GET /%s HTTP/1.1\r\nX-Accept-Redirect: off\r\n\r\n", filename);
else
snprintf(buf, sizeof(buf), "GET /%s HTTP/1.1\r\n\r\n", cli->filename);
snprintf(buf, sizeof(buf), "GET /%s HTTP/1.1\r\n\r\n", filename);
break;
case TIP_CLIENT_HEAD_HEADER:
syslog(LOG_INFO, "connected to %s to get file size of %s\n",
inet_ntoa(cli->addr.sin_addr), cli->filename);
snprintf(buf, sizeof(buf), "HEAD /%s HTTP/1.1\r\n\r\n", cli->filename);
inet_ntoa(cli->addr.sin_addr), filename);
snprintf(buf, sizeof(buf), "HEAD /%s HTTP/1.1\r\n\r\n", filename);
break;
case TIP_CLIENT_POST_REDIRECT:
syslog(LOG_INFO, "connected to %s to report redirection for %s\n",
inet_ntoa(cli->addr.sin_addr), cli->filename);
snprintf(buf, sizeof(buf), "POST /%s HTTP/1.1\r\n\r\n", cli->filename);
inet_ntoa(cli->addr.sin_addr), filename);
snprintf(buf, sizeof(buf), "POST /%s HTTP/1.1\r\n\r\n", filename);
break;
}
ret = send(cli->io.fd, buf, strlen(buf), 0);
if (ret < 0) {
syslog(LOG_ERR, "failed to send request for %s", cli->filename);
syslog(LOG_ERR, "failed to send request for %s", filename);
tip_client_error(cli);
return;
}
@ -445,10 +444,11 @@ static void tip_client_connect_cb(struct ev_loop *loop, struct ev_io *io, int ev
#define TIP_TCP_KEEPALIVE_INTL 30
#define TIP_TCP_KEEPALIVE_CNT 4
static int tip_client_connect(struct tip_client *cli, const char *addr)
static int tip_client_connect(const char *addr)
{
int intl = TIP_TCP_KEEPALIVE_INTL, cnt = TIP_TCP_KEEPALIVE_CNT;
int on = 1, idle = TIP_TCP_KEEPALIVE_IDLE;
struct tip_client *cli = &_cli;
int remote_fd;
int flags;
int len;
@ -480,8 +480,7 @@ static int tip_client_connect(struct tip_client *cli, const char *addr)
len = sizeof(cli->addr);
ret = connect(remote_fd, (struct sockaddr *)&cli->addr, len);
if (ret < 0 && errno != EINPROGRESS) {
syslog(LOG_ERR, "failed to connect to server to fetch %s",
cli->filename);
syslog(LOG_ERR, "failed to connect to server to fetch %s", filename);
tip_client_error(cli);
return ret;
}
@ -498,8 +497,7 @@ static int tip_client_connect(struct tip_client *cli, const char *addr)
static int tip_client_request_file(struct tip_client *cli,
const char *server, const char *filename)
{
cli->filename = filename;
tip_client_connect(cli, server);
tip_client_connect(server);
while (cli->state != TIP_CLIENT_DONE && !cli->error)
ev_loop(tip_main_loop, 0);
@ -557,84 +555,22 @@ static void tip_client_reset_state(struct tip_client *cli, int fd,
cli->num_retries = num_retries;
}
static int tip_client_run(struct tip_client *cli, int fd, const char *addr,
const char *filename, uint32_t chunk,
uint64_t chunk_size)
{
int ret;
do {
tip_client_reset_state(cli, fd, chunk_size * chunk);
syslog(LOG_INFO, "Requesting file %s to server\n", filename);
cli->state = TIP_CLIENT_GET_HEADER;
ret = tip_client_request_file(cli, addr, filename);
} while (ret > 0);
if (ret < 0) {
do {
tip_client_reset_state(cli, fd, chunk_size * chunk);
cli->server_only = true;
syslog(LOG_INFO, "Requesting file %s to server only\n", filename);
cli->state = TIP_CLIENT_GET_HEADER;
ret = tip_client_request_file(cli, addr, filename);
} while (ret > 0);
}
if (ret < 0)
return ret;
if (cli->redirected)
tip_client_stats.redirects++;
else
tip_client_stats.direct_from_server++;
tip_client_progress(cli, true);
return 0;
}
static int tip_client_report(struct tip_client *cli, const struct timeval *tv,
uint64_t data_len, pid_t pid)
{
if (cli->state == TIP_CLIENT_DONE) {
printf("(%u) OK.\n", pid);
syslog(LOG_INFO, "(%u) Done in %lu.%06lu seconds (%lu Mbytes/second). "
"Direct from server: %u Redirected: %u\n",
pid, tv->tv_sec, tv->tv_usec,
tv->tv_sec > 0 ? data_len / 1024000 / tv->tv_sec : data_len / 1024000,
tip_client_stats.direct_from_server,
tip_client_stats.redirects);
return EXIT_SUCCESS;
}
printf("(%u) Failure, see syslog for details.\n", pid);
syslog(LOG_INFO, "(%u) Failure after %lu.%06lu seconds (%lu Mbytes/second). "
"Direct from server: %u Redirected: %u\n",
pid, tv->tv_sec, tv->tv_usec,
tv->tv_sec > 0 ? data_len / 1024000 / tv->tv_sec : data_len / 1024000,
tip_client_stats.direct_from_server,
tip_client_stats.redirects);
return EXIT_FAILURE;
}
#define NUM_CONN 2
static char _filename[PATH_MAX + 1];
int main(int argc, char *argv[])
{
uint32_t chunk_array[NUM_CONN][MAX_CHUNKS / NUM_CONN] = {};
struct timeval tv_start, tv_stop, tv;
uint64_t data_len = 0, file_size = 0;
bool file_chunk[MAX_CHUNKS] = {};
char filename[PATH_MAX + 1];
uint64_t file_size = 0;
int i, j, k, fd, ret;
uint64_t chunk_size;
int i, k, fd, ret;
if (argc != 3) {
printf("%s [ip] [file]\n", argv[0]);
return EXIT_FAILURE;
}
addr = argv[1];
filename = argv[2];
openlog("tiptorrent-client", LOG_PID, LOG_DAEMON);
@ -647,50 +583,83 @@ int main(int argc, char *argv[])
do {
tip_client_reset_state(&_cli, -1, 0);
_cli.state = TIP_CLIENT_HEAD_HEADER;
ret = tip_client_request_file(&_cli, addr, argv[2]);
ret = tip_client_request_file(&_cli, addr, filename);
} while (ret > 0);
if (ret < 0)
goto err_bailout;
if (_cli.state != TIP_CLIENT_DONE)
goto err;
fd = _cli.fd;
file_size = _cli.content_len;
chunk_size = file_size / MAX_CHUNKS;
for (i = 0; i < NUM_CONN; i++) {
for (j = 0; j < MAX_CHUNKS / NUM_CONN; j++) {
k = select_file_chunk(file_chunk);
chunk_array[i][j] = k;
file_chunk[k] = true;
for (i = 0; i < MAX_CHUNKS; i++) {
k = select_file_chunk(file_chunk);
snprintf(_filename, sizeof(_filename), "%s.%u", argv[2], k);
filename = _filename;
chunk_size = file_size / MAX_CHUNKS;
do {
tip_client_reset_state(&_cli, fd, chunk_size * k);
syslog(LOG_INFO, "Requesting file %s to server\n", filename);
_cli.state = TIP_CLIENT_GET_HEADER;
ret = tip_client_request_file(&_cli, addr, filename);
} while (ret > 0);
if (ret < 0) {
do {
tip_client_reset_state(&_cli, fd, chunk_size * k);
_cli.server_only = true;
syslog(LOG_INFO, "Requesting file %s to server only\n", filename);
_cli.state = TIP_CLIENT_GET_HEADER;
ret = tip_client_request_file(&_cli, addr, filename);
} while (ret > 0);
}
}
for (i = 0; i < NUM_CONN; i++) {
ret = fork();
if (ret > 0) {
for (j = 0; j < MAX_CHUNKS / NUM_CONN; j++) {
k = chunk_array[i][j];
snprintf(filename, sizeof(filename), "%s.%u", argv[2], k);
if (ret < 0)
goto err_bailout;
if (tip_client_run(&_cli, fd, addr, filename, k, chunk_size) < 0)
goto err_bailout;
}
break;
} else if (ret < 0) {
syslog(LOG_ERR, "failed to fork process");
break;
}
}
if (ret == 0) {
while (wait(NULL) > 0);
return EXIT_SUCCESS;
}
if (_cli.redirected)
tip_client_stats.redirects++;
else
tip_client_stats.direct_from_server++;
tip_client_progress(&_cli, true);
file_chunk[k] = true;
data_len += _cli.data_len;
}
close(fd);
err:
gettimeofday(&tv_stop, NULL);
timersub(&tv_stop, &tv_start, &tv);
return tip_client_report(&_cli, &tv, file_size / 2, ret);
if (data_len != file_size) {
syslog(LOG_ERR, "Failure, file size is %lu bytes but received %lu bytes!\n",
file_size, data_len);
return EXIT_FAILURE;
}
if (_cli.state == TIP_CLIENT_DONE) {
printf("OK.\n");
syslog(LOG_INFO, "Done in %lu.%06lu seconds (%lu Mbytes/second). "
"Direct from server: %u Redirected: %u\n",
tv.tv_sec, tv.tv_usec,
tv.tv_sec > 0 ? data_len / 1024000 / tv.tv_sec : data_len / 1024000,
tip_client_stats.direct_from_server,
tip_client_stats.redirects);
return EXIT_SUCCESS;
}
printf("Failure, see syslog for details.\n");
syslog(LOG_INFO, "Failure after %lu.%06lu seconds (%lu Mbytes/second). "
"Direct from server: %u Redirected: %u\n",
tv.tv_sec, tv.tv_usec,
tv.tv_sec > 0 ? data_len / 1024000 / tv.tv_sec : data_len / 1024000,
tip_client_stats.direct_from_server,
tip_client_stats.redirects);
return EXIT_FAILURE;
err_bailout:
if (_cli.fatal)