Commit fbdc9db3643e89e26d6e49f7b4e4ec1c668a4f0d

Edward Thomson 2015-01-22T16:10:06

filters: introduce streaming filters Add structures and preliminary functions to take a buffer, file or blob and write the contents in chunks through an arbitrary number of chained filters, finally writing into a user-provided function accept the contents.

diff --git a/include/git2/filter.h b/include/git2/filter.h
index 5b3f403..600356c 100644
--- a/include/git2/filter.h
+++ b/include/git2/filter.h
@@ -137,6 +137,22 @@ GIT_EXTERN(int) git_filter_list_apply_to_blob(
 	git_filter_list *filters,
 	git_blob *blob);
 
+GIT_EXTERN(int) git_filter_list_stream_data(
+	git_filter_list *filters,
+	git_buf *data,
+	git_filter_stream *target);
+
+GIT_EXTERN(int) git_filter_list_stream_file(
+	git_filter_list *filters,
+	git_repository *repo,
+	const char *path,
+	git_filter_stream *target);
+
+GIT_EXTERN(int) git_filter_list_stream_blob(
+	git_filter_list *filters,
+	git_blob *blob,
+	git_filter_stream *target);
+
 /**
  * Free a git_filter_list
  *
diff --git a/include/git2/sys/filter.h b/include/git2/sys/filter.h
index 6024827..cc06c54 100644
--- a/include/git2/sys/filter.h
+++ b/include/git2/sys/filter.h
@@ -208,6 +208,19 @@ typedef int (*git_filter_apply_fn)(
 	const git_buf *from,
 	const git_filter_source *src);
 
+struct git_filter_stream {
+	int (*write)(git_filter_stream *stream, const char *buffer, size_t len);
+	int (*close)(git_filter_stream *stream);
+	void (*free)(git_filter_stream *stream);
+};
+
+typedef int (*git_filter_stream_fn)(
+	git_filter_stream **out,
+	git_filter *self,
+	void **payload,
+	const git_filter_source *src,
+	git_filter_stream *next);
+
 /**
  * Callback to clean up after filtering has been applied
  *
@@ -247,6 +260,7 @@ struct git_filter {
 	git_filter_shutdown_fn shutdown;
 	git_filter_check_fn    check;
 	git_filter_apply_fn    apply;
+	git_filter_stream_fn   stream;
 	git_filter_cleanup_fn  cleanup;
 };
 
diff --git a/include/git2/types.h b/include/git2/types.h
index 35e1573..3ed97d1 100644
--- a/include/git2/types.h
+++ b/include/git2/types.h
@@ -410,6 +410,9 @@ typedef enum {
 	GIT_SUBMODULE_RECURSE_ONDEMAND = 2,
 } git_submodule_recurse_t;
 
+/** A stream to write filters. */
+typedef struct git_filter_stream git_filter_stream;
+
 /** @} */
 GIT_END_DECL
 
diff --git a/src/filter.c b/src/filter.c
index 7b54a76..1af17d8 100644
--- a/src/filter.c
+++ b/src/filter.c
@@ -703,25 +703,249 @@ int git_filter_list_apply_to_file(
 	return error;
 }
 
+static int buf_from_blob(git_buf *out, git_blob *blob)
+{
+	git_off_t rawsize = git_blob_rawsize(blob);
+
+	if (!git__is_sizet(rawsize)) {
+		giterr_set(GITERR_OS, "Blob is too large to filter");
+		return -1;
+	}
+
+	out->ptr = (char *)git_blob_rawcontent(blob);
+	out->asize = 0;
+	out->size  = (size_t)rawsize;
+
+	return 0;
+}
+
 int git_filter_list_apply_to_blob(
 	git_buf *out,
 	git_filter_list *filters,
 	git_blob *blob)
 {
 	git_buf in = GIT_BUF_INIT;
-	git_off_t rawsize = git_blob_rawsize(blob);
 
-	if (!git__is_sizet(rawsize)) {
-		giterr_set(GITERR_OS, "Blob is too large to filter");
+	if (buf_from_blob(&in, blob) < 0)
 		return -1;
+
+	if (filters)
+		git_oid_cpy(&filters->source.oid, git_blob_id(blob));
+
+	return git_filter_list_apply_to_data(out, filters, &in);
+}
+
+struct proxy_stream {
+	git_filter_stream base;
+	git_filter *filter;
+	const git_filter_source *source;
+	void **payload;
+	git_buf input;
+	git_buf output;
+	git_filter_stream *target;
+};
+
+static int proxy_stream_write(
+	git_filter_stream *s, const char *buffer, size_t len)
+{
+	struct proxy_stream *proxy_stream = (struct proxy_stream *)s;
+	assert(proxy_stream);
+
+	return git_buf_put(&proxy_stream->input, buffer, len);
+}
+
+static int proxy_stream_close(git_filter_stream *s)
+{
+	struct proxy_stream *proxy_stream = (struct proxy_stream *)s;
+	git_buf *writebuf;
+	int error;
+
+	assert(proxy_stream);
+
+	error = proxy_stream->filter->apply(
+		proxy_stream->filter,
+		proxy_stream->payload,
+		&proxy_stream->output,
+		&proxy_stream->input,
+		proxy_stream->source);
+
+	if (error == GIT_PASSTHROUGH) {
+		writebuf = &proxy_stream->input;
+	} else if (error == 0) {
+		git_buf_sanitize(&proxy_stream->output);
+		writebuf = &proxy_stream->output;
+	} else {
+		return error;
+	}
+
+	if ((error = proxy_stream->target->write(
+			proxy_stream->target, writebuf->ptr, writebuf->size)) == 0)
+		error = proxy_stream->target->close(proxy_stream->target);
+
+	return error;
+}
+
+static void proxy_stream_free(git_filter_stream *s)
+{
+	struct proxy_stream *proxy_stream = (struct proxy_stream *)s;
+	assert(proxy_stream);
+
+	git_buf_free(&proxy_stream->input);
+	git_buf_free(&proxy_stream->output);
+	git__free(proxy_stream);
+}
+
+static int proxy_stream_init(
+	git_filter_stream **out,
+	git_filter *filter,
+	void **payload,
+	const git_filter_source *source,
+	git_filter_stream *target)
+{
+	struct proxy_stream *proxy_stream = git__calloc(1, sizeof(struct proxy_stream));
+	GITERR_CHECK_ALLOC(proxy_stream);
+
+	proxy_stream->base.write = proxy_stream_write;
+	proxy_stream->base.close = proxy_stream_close;
+	proxy_stream->base.free = proxy_stream_free;
+	proxy_stream->filter = filter;
+	proxy_stream->payload = payload;
+	proxy_stream->source = source;
+	proxy_stream->target = target;
+
+	*out = (git_filter_stream *)proxy_stream;
+	return 0;
+}
+
+static int stream_list_init(
+	git_filter_stream **out,
+	git_vector *streams,
+	git_filter_list *filters,
+	git_filter_stream *target)
+{
+	git_vector filter_streams = GIT_VECTOR_INIT;
+	git_filter_stream *last_stream = target;
+	size_t i;
+	int error = 0;
+
+	*out = NULL;
+
+	if (!filters) {
+		*out = target;
+		return 0;
+	}
+
+	/* Create filters last to first to get the chaining direction */
+	for (i = 0; i < git_array_size(filters->filters); ++i) {
+		size_t filter_idx = (filters->source.mode == GIT_FILTER_TO_WORKTREE) ?
+			git_array_size(filters->filters) - 1 - i : i;
+		git_filter_entry *fe = git_array_get(filters->filters, filter_idx);
+		git_filter_stream *filter_stream;
+		git_filter_stream_fn stream_init;
+		
+		assert(fe->filter->stream || fe->filter->apply);
+
+		/* If necessary, create a stream that proxies the one-shot apply */
+		stream_init = fe->filter->stream ?
+			fe->filter->stream : proxy_stream_init;
+
+		if ((error = stream_init(&filter_stream, fe->filter, &fe->payload, &filters->source, last_stream)) < 0)
+				return error;
+
+		git_vector_insert(&filter_streams, filter_stream);
+		last_stream = filter_stream;
 	}
 
-	in.ptr   = (char *)git_blob_rawcontent(blob);
-	in.asize = 0;
-	in.size  = (size_t)rawsize;
+	*out = last_stream;
+	return 0;
+}
+
+void stream_list_free(git_vector *streams)
+{
+	git_filter_stream *stream;
+	size_t i;
+
+	git_vector_foreach(streams, i, stream)
+		stream->free(stream);
+}
+
+#define STREAM_BUFSIZE 10240
+
+/* TODO: maybe not use filter_stream as a target but create one */
+int git_filter_list_stream_file(
+	git_filter_list *filters,
+	git_buf *data,
+	git_repository *repo,
+	const char *path,
+	git_filter_stream *target)
+{
+	char buf[STREAM_BUFSIZE];
+	git_buf abspath = GIT_BUF_INIT, raw = GIT_BUF_INIT;
+	const char *base = repo ? git_repository_workdir(repo) : NULL;
+	git_vector filter_streams = GIT_VECTOR_INIT;
+	git_filter_stream *stream_start;
+	ssize_t readlen;
+	int fd, error;
+
+	if ((error = stream_list_init(
+			&stream_start, &filter_streams, filters, target)) < 0 ||
+		(error = git_path_join_unrooted(&abspath, path, base, NULL)) < 0)
+		goto done;
+
+	if ((fd = git_futils_open_ro(path)) < 0) {
+		error = fd;
+		goto done;
+	}
+
+	while ((readlen = p_read(fd, buf, STREAM_BUFSIZE)) > 0) {
+		if ((error = stream_start->write(stream_start, data->ptr, data->size)) < 0)
+			goto done;
+	}
+
+	if (!readlen)
+		error = stream_start->close(stream_start);
+	else if (readlen < 0)
+		error = readlen;
+
+	p_close(fd);
+
+done:
+	stream_list_free(&filter_streams);
+	git_buf_free(&abspath);
+	return error;
+}
+
+int git_filter_list_stream_data(
+	git_filter_list *filters,
+	git_buf *data,
+	git_filter_stream *target)
+{
+	git_vector filter_streams = GIT_VECTOR_INIT;
+	git_filter_stream *stream_start;
+	int error = 0;
+
+	if ((error = stream_list_init(
+			&stream_start, &filter_streams, filters, target)) == 0 &&
+		(error =
+			stream_start->write(stream_start, data->ptr, data->size)) == 0)
+		error = stream_start->close(stream_start);
+
+	stream_list_free(&filter_streams);
+	return error;
+}
+
+int git_filter_list_stream_blob(
+	git_filter_list *filters,
+	git_blob *blob,
+	git_filter_stream *target)
+{
+	git_buf in = GIT_BUF_INIT;
+
+	if (buf_from_blob(&in, blob) < 0)
+		return -1;
 
 	if (filters)
 		git_oid_cpy(&filters->source.oid, git_blob_id(blob));
 
-	return git_filter_list_apply_to_data(out, filters, &in);
+	return git_filter_list_stream_data(filters, &in, target);
 }